Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/config/v1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
splugin.OpPing,
splugin.OpNewWorkConn,
splugin.OpNewUserConn,
splugin.OpCloseUserConn,
}
)

Expand Down
54 changes: 42 additions & 12 deletions pkg/plugin/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ import (
)

type Manager struct {
loginPlugins []Plugin
newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
loginPlugins []Plugin
newProxyPlugins []Plugin
closeProxyPlugins []Plugin
pingPlugins []Plugin
newWorkConnPlugins []Plugin
newUserConnPlugins []Plugin
closeUserConnPlugins []Plugin
}

func NewManager() *Manager {
return &Manager{
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
loginPlugins: make([]Plugin, 0),
newProxyPlugins: make([]Plugin, 0),
closeProxyPlugins: make([]Plugin, 0),
pingPlugins: make([]Plugin, 0),
newWorkConnPlugins: make([]Plugin, 0),
newUserConnPlugins: make([]Plugin, 0),
closeUserConnPlugins: make([]Plugin, 0),
}
}

Expand All @@ -63,6 +65,9 @@ func (m *Manager) Register(p Plugin) {
if p.IsSupport(OpNewUserConn) {
m.newUserConnPlugins = append(m.newUserConnPlugins, p)
}
if p.IsSupport(OpCloseUserConn) {
m.closeUserConnPlugins = append(m.closeUserConnPlugins, p)
}
}

func (m *Manager) Login(content *LoginContent) (*LoginContent, error) {
Expand Down Expand Up @@ -259,3 +264,28 @@ func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent,
}
return content, nil
}

func (m *Manager) CloseUserConn(content *CloseUserConnContent) error {
if len(m.closeUserConnPlugins) == 0 {
return nil
}

errs := make([]string, 0)
reqid, _ := util.RandID()
xl := xlog.New().AppendPrefix("reqid: " + reqid)
ctx := xlog.NewContext(context.Background(), xl)
ctx = NewReqidContext(ctx, reqid)

for _, p := range m.closeUserConnPlugins {
_, _, err := p.Handle(ctx, OpCloseUserConn, *content)
if err != nil {
xl.Warnf("send CloseUserConn request to plugin [%s] error: %v", p.Name(), err)
errs = append(errs, fmt.Sprintf("[%s]: %v", p.Name(), err))
}
}

if len(errs) > 0 {
return fmt.Errorf("send CloseUserConn request to plugin errors: %s", strings.Join(errs, "; "))
}
return nil
}
13 changes: 7 additions & 6 deletions pkg/plugin/server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
const (
APIVersion = "0.1.0"

OpLogin = "Login"
OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
OpLogin = "Login"
OpNewProxy = "NewProxy"
OpCloseProxy = "CloseProxy"
OpPing = "Ping"
OpNewWorkConn = "NewWorkConn"
OpNewUserConn = "NewUserConn"
OpCloseUserConn = "CloseUserConn"
)

type Plugin interface {
Expand Down
7 changes: 7 additions & 0 deletions pkg/plugin/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,10 @@ type NewUserConnContent struct {
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"`
}

type CloseUserConnContent struct {
User UserInfo `json:"user"`
ProxyName string `json:"proxy_name"`
ProxyType string `json:"proxy_type"`
RemoteAddr string `json:"remote_addr"`
}
11 changes: 10 additions & 1 deletion server/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
xl.Warnf("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
return
}
defer func() {
_ = rc.PluginManager.CloseUserConn(&plugin.CloseUserConnContent{
User: content.User,
ProxyName: content.ProxyName,
ProxyType: content.ProxyType,
RemoteAddr: content.RemoteAddr,
})
}()

// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
Expand Down Expand Up @@ -294,12 +302,13 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {

name := pxy.GetName()
proxyType := cfg.Type
userRemoteAddr := userConn.RemoteAddr().String()
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount, _ := libio.Join(local, userConn)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debugf("join connections closed")
xl.Debugf("join connections closed, userConn(r[%s])", userRemoteAddr)
}

type Options struct {
Expand Down
46 changes: 46 additions & 0 deletions test/e2e/legacy/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,52 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
})
})

ginkgo.Describe("CloseUserConn", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
r.Content = &plugin.CloseUserConnContent{}
return &r
}
ginkgo.It("Validate Info", func() {
localPort := f.AllocPort()

var record string
handler := func(req *plugin.Request) *plugin.Response {
var ret plugin.Response
content := req.Content.(*plugin.CloseUserConnContent)
record = content.RemoteAddr
return &ret
}
pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil)

f.RunServer("", pluginServer)

serverConf := consts.LegacyDefaultServerConfig + fmt.Sprintf(`
[plugin.test]
addr = 127.0.0.1:%d
path = /handler
ops = CloseUserConn
`, localPort)

remotePort := f.AllocPort()
clientConf := consts.LegacyDefaultClientConfig
clientConf += fmt.Sprintf(`
[tcp]
type = tcp
local_port = {{ .%s }}
remote_port = %d
`, framework.TCPEchoServerPort, remotePort)

f.RunProcesses(serverConf, []string{clientConf})

framework.NewRequestExpect(f).Port(remotePort).Ensure()

time.Sleep(1 * time.Second)

framework.ExpectNotEqual("", record)
})
})

ginkgo.Describe("HTTPS Protocol", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
Expand Down
48 changes: 48 additions & 0 deletions test/e2e/v1/plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,54 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() {
})
})

ginkgo.Describe("CloseUserConn", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
r.Content = &plugin.CloseUserConnContent{}
return &r
}
ginkgo.It("Validate Info", func() {
localPort := f.AllocPort()

var record string
handler := func(req *plugin.Request) *plugin.Response {
var ret plugin.Response
content := req.Content.(*plugin.CloseUserConnContent)
record = content.RemoteAddr
return &ret
}
pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil)

f.RunServer("", pluginServer)

serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
[[httpPlugins]]
name = "test"
addr = "127.0.0.1:%d"
path = "/handler"
ops = ["CloseUserConn"]
`, localPort)

remotePort := f.AllocPort()
clientConf := consts.DefaultClientConfig
clientConf += fmt.Sprintf(`
[[proxies]]
name = "tcp"
type = "tcp"
localPort = {{ .%s }}
remotePort = %d
`, framework.TCPEchoServerPort, remotePort)

f.RunProcesses(serverConf, []string{clientConf})

framework.NewRequestExpect(f).Port(remotePort).Ensure()

time.Sleep(1 * time.Second)

framework.ExpectNotEqual("", record)
})
})

ginkgo.Describe("HTTPS Protocol", func() {
newFunc := func() *plugin.Request {
var r plugin.Request
Expand Down