-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathswap_math.py
More file actions
139 lines (122 loc) · 4.53 KB
/
swap_math.py
File metadata and controls
139 lines (122 loc) · 4.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
swap_math.py — 单步 swap 计算
对应 Solidity 中 SwapMath.computeSwapStep。
在一个 tick 区间内,给定当前 sqrtPrice、目标 sqrtPrice、流动性和剩余输入量,
计算实际消耗量、产出量和手续费。
"""
from dataclasses import dataclass
from .full_math import mul_div, mul_div_rounding_up, Q96
from .sqrt_price_math import (
get_next_sqrt_price_from_input,
get_next_sqrt_price_from_output,
get_amount0_delta,
get_amount1_delta,
)
@dataclass
class StepResult:
"""单步 swap 的计算结果。"""
sqrt_price_next_x96: int
amount_in: int
amount_out: int
fee_amount: int
def compute_swap_step(
sqrt_ratio_current_x96: int,
sqrt_ratio_target_x96: int,
liquidity: int,
amount_remaining: int,
fee_pips: int,
) -> StepResult:
"""
在一个 tick 区间内执行 swap 计算。
Args:
sqrt_ratio_current_x96: 当前 sqrtPriceX96
sqrt_ratio_target_x96: 目标 sqrtPriceX96(下一个初始化 tick 的价格)
liquidity: 当前活跃流动性
amount_remaining: 剩余输入量(含 fee 的原始输入)
fee_pips: 手续费(百万分之几,如 3000 = 0.3%)
Returns:
StepResult 包含新价格、实际输入、输出和 fee
"""
zero_for_one = sqrt_ratio_current_x96 >= sqrt_ratio_target_x96
exact_in = amount_remaining >= 0
if exact_in:
# 扣除 fee 后的最大可用输入
amount_remaining_less_fee = mul_div(
amount_remaining, 1_000_000 - fee_pips, 1_000_000
)
# 填满整个区间所需的输入量
if zero_for_one:
amount_in = get_amount0_delta(
sqrt_ratio_target_x96, sqrt_ratio_current_x96, liquidity,
round_up=True,
)
else:
amount_in = get_amount1_delta(
sqrt_ratio_current_x96, sqrt_ratio_target_x96, liquidity,
round_up=True,
)
# 如果可用输入不足以到达目标价格
if amount_remaining_less_fee >= amount_in:
sqrt_price_next = sqrt_ratio_target_x96
else:
sqrt_price_next = get_next_sqrt_price_from_input(
sqrt_ratio_current_x96, liquidity,
amount_remaining_less_fee, zero_for_one,
)
else:
# exactOutput: amount_remaining 是负数,表示目标输出量
if zero_for_one:
amount_out = get_amount1_delta(
sqrt_ratio_target_x96, sqrt_ratio_current_x96, liquidity,
round_up=False,
)
else:
amount_out = get_amount0_delta(
sqrt_ratio_current_x96, sqrt_ratio_target_x96, liquidity,
round_up=False,
)
if abs(amount_remaining) >= amount_out:
sqrt_price_next = sqrt_ratio_target_x96
else:
sqrt_price_next = get_next_sqrt_price_from_output(
sqrt_ratio_current_x96, liquidity,
abs(amount_remaining), zero_for_one,
)
max_reached = sqrt_price_next == sqrt_ratio_target_x96
# 计算实际 amount_in 和 amount_out
if zero_for_one:
if not (max_reached and exact_in):
amount_in = get_amount0_delta(
sqrt_price_next, sqrt_ratio_current_x96, liquidity,
round_up=True,
)
if not (max_reached and not exact_in):
amount_out = get_amount1_delta(
sqrt_price_next, sqrt_ratio_current_x96, liquidity,
round_up=False,
)
else:
if not (max_reached and exact_in):
amount_in = get_amount1_delta(
sqrt_ratio_current_x96, sqrt_price_next, liquidity,
round_up=True,
)
if not (max_reached and not exact_in):
amount_out = get_amount0_delta(
sqrt_ratio_current_x96, sqrt_price_next, liquidity,
round_up=False,
)
# 限制 amount_out 不超过 remaining(exactOutput 场景)
if not exact_in and amount_out > abs(amount_remaining):
amount_out = abs(amount_remaining)
# 计算 fee
if exact_in and sqrt_price_next != sqrt_ratio_target_x96:
fee_amount = amount_remaining - amount_in
else:
fee_amount = mul_div_rounding_up(amount_in, fee_pips, 1_000_000 - fee_pips)
return StepResult(
sqrt_price_next_x96=sqrt_price_next,
amount_in=amount_in,
amount_out=amount_out,
fee_amount=fee_amount,
)