-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
2282 lines (1940 loc) · 95.1 KB
/
server.py
File metadata and controls
2282 lines (1940 loc) · 95.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
server.py — PeerGlass MCP Server (Phase 1 + 2 + 3 + 4)
Gives Claude the ability to query all 5 global Regional Internet Registries
simultaneously using RDAP (the modern JSON replacement for legacy WHOIS),
validate routes via RPKI, check BGP routing table visibility, audit
organization resources, trace full historical allocation timelines, and
integrate with PeeringDB for community peering intelligence.
Note on protocol: this server uses RDAP exclusively for live registry
queries. RDAP (RFC 7480–7484) is the IANA-mandated successor to legacy
WHOIS. The only place "historical-whois" appears is as the name of a
RIPE Stat API endpoint — that is the upstream API's own name, not our
protocol choice.
Phase 1 Tools (5): RDAP registry lookups (Who owns this?)
Phase 2 Tools (4): RPKI + BGP routing security (Is it valid/safe?)
Phase 3 Tools (4): Historical timelines + global stats (What happened?)
Phase 4 Tools (4): PeeringDB + IXP + health + monitoring (What's happening now?)
"""
import json
from mcp.server.fastmcp import FastMCP
import cache as cache_module
import rir_client
from models import (
IPQueryInput,
ASNQueryInput,
AbuseContactInput,
RPKICheckInput,
BGPStatusInput,
AnnouncedPrefixesInput,
OrgAuditInput,
PrefixHistoryInput,
TransferDetectInput,
IPv4StatsInput,
PrefixOverviewInput,
PeeringInfoInput,
IXPLookupInput,
NetworkHealthInput,
ChangeMonitorInput,
ResponseFormat,
OrgAuditResult,
DNSResolveInput,
DNSEnumerateInput,
DNSSECInput,
DNSBLInput,
EmailSecurityInput,
DNSPropagationInput,
TLSInspectInput,
CTLogInput,
ThreatIntelInput,
PassiveDNSInput,
IRRCheckInput,
RouteLeakInput,
LookingGlassInput,
RouteStabilityInput,
ShutdownDetectInput,
MonitorRegisterInput,
ShutdownTimelineInput,
CensorshipProbeInput,
SatelliteConnectivityInput,
ChokePointInput,
OONIReportInput,
CountryHealthInput,
ASRelationshipInput,
GeoLookupInput,
AtlasTraceInput,
)
from normalizer import (
normalize_ip_response,
normalize_asn_response,
extract_abuse_contact,
)
from formatters import (
format_ip_results_md,
format_asn_results_md,
format_abuse_contact_md,
format_rir_status_md,
format_rpki_result_md,
format_bgp_status_md,
format_org_audit_md,
format_prefix_history_md,
format_transfer_detect_md,
format_ipv4_stats_md,
format_prefix_overview_md,
format_peering_info_md,
format_ixp_lookup_md,
format_network_health_md,
format_change_monitor_md,
format_dns_resolve_md,
format_dns_enumerate_md,
format_dns_dnssec_md,
format_dns_dnsbl_md,
format_email_security_md,
format_dns_propagation_md,
format_tls_inspect_md,
format_ct_logs_md,
format_threat_intel_md,
format_passive_dns_md,
format_irr_result_md,
format_route_leak_md,
format_looking_glass_md,
format_route_stability_md,
format_shutdown_detect_md,
format_monitor_register_md,
format_shutdown_timeline_md,
format_censorship_probe_md,
format_satellite_connectivity_md,
format_chokepoints_md,
format_ooni_report_md,
format_country_health_md,
format_as_relationships_md,
format_geo_lookup_md,
format_atlas_trace_md,
to_json,
)
# ──────────────────────────────────────────────────────────────
# MCP Server
# ──────────────────────────────────────────────────────────────
mcp = FastMCP(
"peerglass",
instructions="""
You are connected to PeerGlass — an internet resource intelligence MCP server.
Uses RDAP (the modern successor to legacy WHOIS) to query all 5 RIRs,
validates RPKI routes, checks BGP visibility, looks up PeeringDB peering
info and IXPs, runs combined health checks, and monitors resources for
registration and routing changes.
Quick tool selection guide:
Who owns this IP / ASN? → rir_query_ip / rir_query_asn
Find abuse contact? → rir_get_abuse_contact
Is the BGP route RPKI valid? → rir_check_rpki
Is it announced in BGP? → rir_check_bgp_status
What prefixes does an ASN route? → rir_get_announced_prefixes
All resources for an org? → rir_audit_org
Full ownership history? → rir_prefix_history
Ever been transferred? → rir_detect_transfers
Global IPv4/IPv6 exhaustion stats? → rir_ipv4_stats
Raw delegated IPv4 blocks? → rir_ipv4_stats (include_blocks=true + rir_filter)
Prefix parent/child hierarchy? → rir_prefix_overview
Peering policy + IXP presence? → rir_peering_info
IXPs in a country or by name? → rir_ixp_lookup
One-shot full health check? → rir_network_health
Detect registration/BGP changes? → rir_change_monitor
Power workflows:
BGP hijack: rir_network_health → rir_check_rpki → rir_prefix_overview
M&A due dilig.: rir_audit_org → rir_prefix_history → rir_detect_transfers
NOC incident: rir_network_health → rir_get_abuse_contact → rir_peering_info
Ongoing watch: rir_change_monitor (call repeatedly to detect drift)
IRR coverage? → rir_check_irr
Route leak? → rir_detect_route_leak
AS paths? → rir_looking_glass
Route stable? → rir_route_stability
Country shutdown? → peerglass_shutdown_detect
Register webhook? → peerglass_monitor_register
Shutdown history? → peerglass_shutdown_timeline
DNS censored? → peerglass_dns_censorship
Satellite internet? → peerglass_satellite_connectivity
Chokepoints? → peerglass_country_chokepoints
OONI censorship? → peerglass_ooni_report
Country health? → peerglass_country_health
AS relationships? → rir_as_relationships
IP geolocation? → peerglass_geo_lookup
Traceroute? → peerglass_atlas_trace
""",
)
# ──────────────────────────────────────────────────────────────
# Tool 1 — Query IP Address
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_query_ip",
annotations={
"title": "Query IP Address Across All 5 RIRs",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_query_ip(params: IPQueryInput) -> str:
"""
Query all 5 global RIRs simultaneously for an IP address using RDAP.
Fires parallel RDAP requests to AFRINIC, APNIC, ARIN, LACNIC, and RIPE NCC.
Exactly one RIR will be authoritative; the others return 'not found'.
Normalizes all responses into a unified schema and returns a clear summary.
Results are cached for 1 hour to respect RIR rate limits.
Args:
params (IPQueryInput):
- ip_address (str): IPv4 or IPv6 address e.g. '1.1.1.1', '2001:4860:4860::8888'
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: Registration details including prefix, owner, country, allocation date,
abuse email, and responses from all 5 RIRs.
JSON schema:
{
"ip": str,
"results": [{"rir": str, "status": str, "error": str|null}],
"normalized": [{"rir": str, "prefix": str, "name": str,
"org_name": str, "country": str,
"allocation_date": str, "abuse_email": str}]
}
"""
ip = params.ip_address.strip()
cache_key = cache_module.make_ip_key(ip)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
raw_results = await rir_client.query_ip_all_rirs(ip)
normalized = []
for result in raw_results:
if result.status == "ok" and result.data:
try:
normalized.append(normalize_ip_response(result.rir.value, result.data))
except Exception as exc:
result.error = f"Normalization error: {exc}"
md = format_ip_results_md(ip, normalized, raw_results)
jsn = to_json({
"ip": ip,
"results": [r.model_dump(exclude={"data"}) for r in raw_results],
"normalized": [n.model_dump(exclude={"raw"}) for n in normalized],
})
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_IP)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 2 — Query ASN
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_query_asn",
annotations={
"title": "Query ASN Across All 5 RIRs",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_query_asn(params: ASNQueryInput) -> str:
"""
Query all 5 global RIRs simultaneously for an Autonomous System Number.
An ASN is a unique number assigned to a network operator (ISP, CDN,
large enterprise) that participates in BGP routing. Examples:
AS15169 = Google, AS13335 = Cloudflare, AS36864 = AFRINIC itself.
Accepts: 'AS15169', '15169', or named sets like 'AS-GOOGLE'.
Results are cached for 1 hour.
Args:
params (ASNQueryInput):
- asn (str): ASN in any format (e.g. 'AS15169', '13335', 'AS-CLOUDFLARE')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: ASN registration details including owner, country, allocation date,
abuse email, and responses from all 5 RIRs.
JSON schema:
{
"asn": str,
"results": [{"rir": str, "status": str}],
"normalized": [{"rir": str, "asn": str, "name": str,
"org_name": str, "country": str, "abuse_email": str}]
}
"""
asn = params.asn.strip()
cache_key = cache_module.make_asn_key(asn)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
raw_results = await rir_client.query_asn_all_rirs(asn)
normalized = []
for result in raw_results:
if result.status == "ok" and result.data:
try:
normalized.append(normalize_asn_response(result.rir.value, result.data))
except Exception as exc:
result.error = f"Normalization error: {exc}"
md = format_asn_results_md(asn, normalized, raw_results)
jsn = to_json({
"asn": asn,
"results": [r.model_dump(exclude={"data"}) for r in raw_results],
"normalized": [n.model_dump(exclude={"raw"}) for n in normalized],
})
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_ASN)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 3 — Get Abuse Contact
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_get_abuse_contact",
annotations={
"title": "Get Abuse Contact for an IP Address",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_get_abuse_contact(params: AbuseContactInput) -> str:
"""
Find the abuse contact for any IP address globally using IANA bootstrap routing.
Uses IANA RDAP Bootstrap to identify the authoritative RIR first (efficient),
then queries only that RIR. Falls back to querying all 5 if bootstrap fails.
Extracts abuse contacts from entity roles: 'abuse', 'technical', 'noc'.
Use this tool as the first step in any network abuse reporting workflow:
spam, DDoS attacks, port scanning, credential stuffing, etc.
Results are cached for 1 hour.
Args:
params (AbuseContactInput):
- ip_address (str): IPv4 or IPv6 address (e.g. '185.220.101.1')
Returns:
str: Markdown report with abuse email(s), phone(s), network name,
organization, country, and authoritative RIR.
JSON schema:
{
"ip_address": str,
"authoritative_rir": str,
"abuse_email": [str],
"abuse_phone": [str],
"network_name": str,
"org_name": str,
"country": str
}
"""
ip = params.ip_address.strip()
cache_key = cache_module.make_abuse_key(ip)
cached = cache_module.get(cache_key)
if cached:
return cached
result = await rir_client.query_authoritative_rir(ip, "ip")
if not result or not result.data:
output = (
f"## 🚨 Abuse Contact: `{ip}`\n\n"
"> ⚠️ No registration data found for this address.\n"
"> It may be private/reserved space (RFC 1918, RFC 4193) or the query failed.\n\n"
"**Tip:** Run `rir_query_ip` for full diagnostics across all 5 RIRs.\n"
)
cache_module.set(cache_key, output, 300)
return output
contact = extract_abuse_contact(result.rir.value, ip, result.data)
output = format_abuse_contact_md(contact)
cache_module.set(cache_key, output, cache_module.TTL_ABUSE)
return output
# ──────────────────────────────────────────────────────────────
# Tool 4 — RIR Server Status
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_server_status",
annotations={
"title": "Check Health of All 5 RIR RDAP Servers",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": False,
"openWorldHint": True,
},
)
async def rir_server_status() -> str:
"""
Check the real-time health of all 5 RIR RDAP servers simultaneously.
Queries the /help endpoint of each RIR (lightweight, no IP/ASN needed).
Returns RDAP conformance levels and availability status.
Use this before bulk queries to verify connectivity, or to diagnose
why a specific RIR's responses are failing.
Returns:
str: Markdown table with RIR name, region, status (Online/Unreachable),
and supported RDAP conformance extensions.
"""
stats = await rir_client.get_rir_server_status()
stats_str = {str(k): v for k, v in stats.items()}
return format_rir_status_md(stats_str)
# ──────────────────────────────────────────────────────────────
# Tool 5 — Cache Statistics
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_cache_stats",
annotations={
"title": "View In-Memory Query Cache Statistics",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": False,
},
)
async def rir_cache_stats() -> str:
"""
View the current state of the in-memory TTL cache.
The cache prevents repeated queries to RIRs within short windows,
respecting rate limits and reducing latency for repeated lookups.
Returns:
str: JSON with cache entry counts (total, alive, expired)
and the configured TTL for each query type.
Schema:
{
"cache_stats": {"total_entries": int, "alive": int, "expired": int},
"ttl_seconds": {"ip": int, "asn": int, "org": int,
"abuse": int, "bgp": int, "rpki": int}
}
"""
return json.dumps({
"cache_stats": cache_module.stats(),
"ttl_seconds": {
"ip": cache_module.TTL_IP,
"asn": cache_module.TTL_ASN,
"org": cache_module.TTL_ORG,
"abuse": cache_module.TTL_ABUSE,
"bgp": cache_module.TTL_BGP,
"rpki": cache_module.TTL_RPKI,
},
}, indent=2)
# ──────────────────────────────────────────────────────────────
# Tool 6 — RPKI / ROA Validation [Phase 2]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_check_rpki",
annotations={
"title": "Validate RPKI/ROA Status for a Prefix + ASN",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_check_rpki(params: RPKICheckInput) -> str:
"""
Validate a prefix + ASN pair against the global RPKI using Cloudflare's validator.
RPKI (Resource Public Key Infrastructure) is the internet's route security
framework. RIRs issue Route Origin Authorizations (ROAs) — digital certificates
that cryptographically prove an ASN is authorized to announce a prefix.
Validity states:
✅ VALID — A matching ROA exists. Route is cryptographically authorized.
🚨 INVALID — A ROA exists but this ASN/prefix violates it. Possible hijack.
⚠️ NOT-FOUND — No ROA exists. Route is unverified (common, not inherently bad).
❓ UNKNOWN — Could not determine validity.
Combine with rir_check_bgp_status for full routing security assessment.
Results are cached for 15 minutes (ROAs can change, but not frequently).
Args:
params (RPKICheckInput):
- prefix (str): CIDR prefix e.g. '1.1.1.0/24' or '2400:cb00::/32'
- asn (str): Originating ASN e.g. 'AS13335' or '13335'
Returns:
str: RPKI validity state, description, and list of covering ROAs.
JSON schema:
{
"prefix": str, "asn": str,
"validity": "valid"|"invalid"|"not-found"|"unknown",
"covering_roas": [{"asn": int, "prefix": str, "maxLength": int}],
"description": str
}
"""
cache_key = cache_module.make_rpki_key(params.prefix, params.asn)
cached = cache_module.get(cache_key)
if cached:
return cached
result = await rir_client.check_rpki(params.prefix, params.asn)
output = format_rpki_result_md(result)
cache_module.set(cache_key, output, cache_module.TTL_RPKI)
return output
# ──────────────────────────────────────────────────────────────
# Tool 7 — BGP Routing Table Status [Phase 2]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_check_bgp_status",
annotations={
"title": "Check BGP Routing Table Visibility for a Prefix or ASN",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_check_bgp_status(params: BGPStatusInput) -> str:
"""
Check whether a prefix or ASN is currently visible in the global BGP routing table.
Uses RIPE Stat (which aggregates data from RIPE RIS route collectors worldwide).
BGP is the internet's routing protocol — the "GPS" that tells traffic how to
navigate from one network to another.
For a prefix, returns:
- Whether it is currently announced in BGP
- Which ASN(s) are announcing it (multiple = potential hijack)
- Percentage of global BGP peers that can see it (visibility)
For an ASN, returns:
- Whether the ASN has any active BGP announcements
- A list of all announced prefixes (use rir_get_announced_prefixes for details)
Combine with rir_check_rpki for complete routing security assessment.
Results are cached for 5 minutes (BGP tables change frequently).
Args:
params (BGPStatusInput):
- resource (str): Prefix (e.g. '1.1.1.0/24') or ASN (e.g. 'AS15169')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: BGP visibility, announcing ASNs, and prefix list.
JSON schema:
{
"resource": str, "resource_type": str, "is_announced": bool,
"announcing_asns": [str], "visibility_percent": float,
"announced_prefixes": [{"prefix": str, "peers_seeing": int}]
}
"""
resource = params.resource.strip()
cache_key = cache_module.make_bgp_key(resource)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
result = await rir_client.get_bgp_status(resource)
md = format_bgp_status_md(result)
jsn = to_json(result)
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_BGP)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 8 — Announced Prefixes by ASN [Phase 2]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_get_announced_prefixes",
annotations={
"title": "Get All BGP-Announced Prefixes for an ASN",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_get_announced_prefixes(params: AnnouncedPrefixesInput) -> str:
"""
Retrieve all IP prefixes currently being announced by an ASN in BGP.
This shows the ASN's complete routing footprint — every IP range it
is actively advertising to the global internet via BGP. Uses RIPE
Stat's announced-prefixes endpoint.
Useful for:
- Understanding an organization's complete IP footprint
- Detecting unexpected prefix announcements (possible hijacks)
- M&A due diligence on network assets
- Security research and threat intelligence
min_peers_seeing filters out unstable/flapping routes that only
a small number of BGP peers can see. Higher = more stable routes only.
Results are cached for 5 minutes.
Args:
params (AnnouncedPrefixesInput):
- asn (str): ASN to query (e.g. 'AS13335' or '15169')
- min_peers_seeing (int): Minimum peer count filter (default: 5)
Returns:
str: Complete list of announced prefixes with peer visibility and
first/last seen timestamps.
JSON schema:
{
"resource": str, "is_announced": bool,
"announced_prefixes": [
{"prefix": str, "peers_seeing": int,
"first_seen": str, "last_seen": str}
]
}
"""
asn = params.asn.strip()
cache_key = cache_module.make_bgp_key(f"prefixes:{asn}:{params.min_peers_seeing}")
cached = cache_module.get(cache_key)
if cached:
return cached
result = await rir_client.get_announced_prefixes(asn, params.min_peers_seeing)
output = format_bgp_status_md(result)
cache_module.set(cache_key, output, cache_module.TTL_BGP)
return output
# ──────────────────────────────────────────────────────────────
# Tool 9 — Organization Resource Audit [Phase 2]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_audit_org",
annotations={
"title": "Audit All Internet Resources for an Organization",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_audit_org(params: OrgAuditInput) -> str:
"""
Find all IP blocks and ASNs registered to an organization across all RIRs.
Searches RDAP entity databases at AFRINIC, APNIC, ARIN, and RIPE
(LACNIC does not support RDAP entity search — a limitation is noted).
Aggregates results into a unified inventory.
Use cases:
- M&A due diligence: What internet resources does Company X own globally?
- Security research: What is the full IP footprint of an organization?
- ICANN/RIR policy: Are resources distributed across multiple RIRs?
- Incident response: Did this org transfer/sell IP space recently?
Tips:
- Use org handles for precision (e.g. 'GOOGL-ARIN' not 'Google')
- Partial name matching is supported (e.g. 'Cloudflare' finds 'Cloudflare Inc.')
- Results are cached for 6 hours.
Args:
params (OrgAuditInput):
- org_name (str): Organization name or handle (e.g. 'Cloudflare', 'GOOGL-ARIN')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: Summary of all IP blocks and ASNs by RIR, with handles, names,
countries, and allocation dates.
JSON schema:
{
"org_query": str, "total_resources": int,
"ip_blocks": [{"rir": str, "handle": str, "prefix_or_asn": str,
"name": str, "country": str}],
"asns": [...],
"rirs_found_in": [str], "errors": [str]
}
"""
org = params.org_name.strip()
cache_key = cache_module.make_org_key(org)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
resources, errors = await rir_client.search_org_all_rirs(org)
ip_blocks = [r for r in resources if r.resource_type in ("ip", "entity")]
asns = [r for r in resources if r.resource_type == "asn"]
rirs_found = list({r.rir for r in resources})
audit = OrgAuditResult(
org_query = org,
total_resources = len(resources),
ip_blocks = ip_blocks,
asns = asns,
rirs_found_in = rirs_found,
errors = errors,
)
md = format_org_audit_md(audit)
jsn = to_json(audit)
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_ORG)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 10 — Prefix / ASN History [Phase 3]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_prefix_history",
annotations={
"title": "Full Registration History for a Prefix or ASN",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_prefix_history(params: PrefixHistoryInput) -> str:
"""
Fetch the complete registration history for an IP prefix or ASN.
Returns a chronological timeline of every change ever recorded:
- Initial registration (who, when, under which RIR)
- Org / maintainer changes (potential ownership transfers)
- Status changes (allocated → assigned → available)
- Last-changed updates
Uses RIPE Stat's historical-whois and allocation-history APIs.
Coverage is best for RIPE NCC resources; partial for other RIRs.
Use cases:
- "Has this IP block ever changed hands?"
- "When was this ASN first registered?"
- "What organization historically owned this prefix?"
- Due diligence, incident response, fraud investigation
Results are cached for 12 hours (historical records are stable).
Args:
params (PrefixHistoryInput):
- resource (str): IP prefix (e.g. '8.8.8.0/24') or ASN (e.g. 'AS15169')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: Chronological event timeline with dates, event types, and attribute changes.
JSON schema:
{
"resource": str, "resource_type": str,
"current_holder": str, "current_rir": str,
"registration_date": str, "total_events": int,
"events": [{"event_date": str, "event_type": str,
"attribute": str, "old_value": str, "new_value": str}],
"sources": [str], "errors": [str]
}
"""
resource = params.resource.strip()
cache_key = cache_module.make_history_key(resource)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
result = await rir_client.get_prefix_history(resource)
md = format_prefix_history_md(result)
jsn = to_json(result)
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_HISTORY)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 11 — Transfer Detection [Phase 3]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_detect_transfers",
annotations={
"title": "Detect Cross-Org and Cross-RIR Resource Transfers",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_detect_transfers(params: TransferDetectInput) -> str:
"""
Detect past ownership or cross-RIR transfers for an IP prefix or ASN.
An ownership transfer happens when a registered org (e.g. 'GOOGL-ARIN')
changes to another org ('META-1-ARIN') in the registration record.
A cross-RIR transfer is rarer — it means the resource physically moved
between registries (e.g. from ARIN to RIPE NCC after an acquisition).
Transfer types detected:
🏢 Org Change — The registering organization changed
🌍→🌎 Cross-RIR — The resource moved to a different RIR
🔄 Intra-RIR — Maintainer changed within the same RIR
How it works: compares consecutive historical WHOIS object versions.
If 'org' or 'mnt-by' changed between versions, a transfer is flagged.
If RIR-specific suffixes in the handles differ, it's cross-RIR.
Results are cached for 12 hours.
Args:
params (TransferDetectInput):
- resource (str): IP prefix (e.g. '8.8.8.0/24') or ASN (e.g. 'AS15169')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: List of detected transfers with dates, types, from/to org, and evidence.
JSON schema:
{
"resource": str, "resource_type": str,
"transfers_detected": int,
"transfers": [{"transfer_date": str, "transfer_type": str,
"from_org": str, "to_org": str,
"from_rir": str, "to_rir": str, "evidence": str}],
"current_holder": str, "first_registered": str,
"sources": [str], "notes": [str]
}
"""
resource = params.resource.strip()
cache_key = cache_module.make_transfer_key(resource)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
result = await rir_client.detect_transfers(resource)
md = format_transfer_detect_md(result)
jsn = to_json(result)
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_TRANSFER)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 12 — Global IPv4 / IPv6 / ASN Stats [Phase 3]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_ipv4_stats",
annotations={
"title": "Global IPv4 / IPv6 / ASN Exhaustion Dashboard",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_ipv4_stats(params: IPv4StatsInput) -> str:
"""
Fetch global IPv4, IPv6, and ASN allocation statistics from all 5 RIRs.
Parses the NRO Extended Delegation Stats files — the authoritative daily
publication of how each RIR has distributed address space:
- How many IPv4 prefixes allocated (to ISPs) vs assigned (to end users)
- Remaining free IPv4 pool (where published — most RIRs are exhausted)
- IPv6 prefix count and growth
- Total ASNs issued
Why this matters:
- IPv4 was exhausted at IANA in 2011
- APNIC exhausted in 2011, RIPE in 2012, ARIN in 2015
- LACNIC near exhaustion 2020, AFRINIC followed
- IPv6 transition is the only long-term solution
- This tool lets you track adoption and exhaustion state in real time
Results are cached for 24 hours (stats files are published once daily).
Args:
params (IPv4StatsInput):
- rir_filter (str, optional): Filter to one RIR ('AFRINIC', 'APNIC',
'ARIN', 'LACNIC', 'RIPE'). Leave empty for all 5.
- include_blocks (bool): Include raw delegated IPv4 blocks for the selected RIR.
Requires rir_filter to be set.
- status_filter (str, optional): allocated | assigned | available (free is normalized).
- country_filter (str, optional): 2-letter country code filter (e.g. 'GH', 'ZA').
- limit (int): Max number of block rows when include_blocks=true.
- offset (int): Pagination offset for block rows.
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: Per-RIR table of IPv4/IPv6/ASN counts with totals and exhaustion context.
JSON schema:
{
"queried_at": str,
"rirs": [{"rir": str, "region": str,
"ipv4_total_prefixes": int, "ipv4_allocated": int,
"ipv4_assigned": int, "ipv4_available": int,
"ipv6_total_prefixes": int, "asn_total": int,
"stats_date": str}],
"global_ipv4_prefixes": int,
"global_ipv6_prefixes": int,
"global_asns": int,
"ipv4_blocks": [
{"rir": str, "country": str|null, "start_ip": str, "end_ip": str,
"address_count": int, "date": str|null, "status": str}
],
"blocks_total": int,
"blocks_returned": int,
"blocks_limit": int|null,
"blocks_offset": int|null,
"blocks_filters": {"rir_filter": str|null, "status_filter": str|null, "country_filter": str|null}
}
"""
rir_filter = (params.rir_filter or "").upper().strip() or "all"
cache_key = cache_module.make_ipv4stat_key(
rir_filter=rir_filter,
include_blocks=params.include_blocks,
status_filter=params.status_filter,
country_filter=params.country_filter,
limit=params.limit,
offset=params.offset,
)
cached = cache_module.get(cache_key)
if cached:
return cached["json"] if params.response_format == ResponseFormat.JSON else cached["markdown"]
result = await rir_client.get_global_ipv4_stats(
rir_filter=params.rir_filter or None,
include_blocks=params.include_blocks,
status_filter=params.status_filter,
country_filter=params.country_filter,
limit=params.limit,
offset=params.offset,
)
md = format_ipv4_stats_md(result)
jsn = to_json(result)
cache_module.set(cache_key, {"markdown": md, "json": jsn}, cache_module.TTL_IPV4STAT)
return jsn if params.response_format == ResponseFormat.JSON else md
# ──────────────────────────────────────────────────────────────
# Tool 13 — Prefix Overview / Hierarchy [Phase 3]
# ──────────────────────────────────────────────────────────────
@mcp.tool(
name="rir_prefix_overview",
annotations={
"title": "Prefix Hierarchy: Parent, Children, Siblings, and BGP Status",
"readOnlyHint": True,
"destructiveHint": False,
"idempotentHint": True,
"openWorldHint": True,
},
)
async def rir_prefix_overview(params: PrefixOverviewInput) -> str:
"""
Fetch a rich hierarchical overview of an IP prefix.
IP address space is organized in a tree structure:
- A /8 contains 256 /16s, each /16 contains 256 /24s, and so on.
- 'Less-specific' = the parent block this prefix was carved from.
- 'More-specific' = smaller blocks assigned within this prefix.
Think of it like a real estate map:
Less-specific = the city block (containing your property)
The prefix itself = your land parcel
More-specific = subdivisions within your parcel
This tool fetches all three layers in parallel (3 RIPE Stat API calls
simultaneously) and returns a unified view including:
- Current holder and announcement status
- Which ASN(s) are announcing it (multiple = potential hijack)
- All less-specific (parent) prefixes up the tree
- All more-specific (child) prefixes within the block
Combine with rir_check_rpki to validate the announcing ASN.
Results are cached for 1 hour.
Args:
params (PrefixOverviewInput):
- prefix (str): IP prefix in CIDR notation (e.g. '1.1.1.0/24')
- response_format (str): 'markdown' (default) or 'json'
Returns:
str: Holder info, BGP status, and full prefix hierarchy table.
JSON schema:
{
"prefix": str, "holder": str, "rir": str, "country": str,
"announced": bool, "announcing_asns": [str],
"allocation_status": str,
"related_prefixes": [{"prefix": str, "relationship": str,
"holder": str, "origin_asn": str}],
"errors": [str]
}
"""