diff --git a/pkg/config/v1/validation/validation.go b/pkg/config/v1/validation/validation.go index 4ca6b67f0f0..8a04a755e63 100644 --- a/pkg/config/v1/validation/validation.go +++ b/pkg/config/v1/validation/validation.go @@ -55,6 +55,7 @@ var ( splugin.OpPing, splugin.OpNewWorkConn, splugin.OpNewUserConn, + splugin.OpCloseUserConn, } ) diff --git a/pkg/plugin/server/manager.go b/pkg/plugin/server/manager.go index dabfb46cbd0..99feda31fb7 100644 --- a/pkg/plugin/server/manager.go +++ b/pkg/plugin/server/manager.go @@ -25,22 +25,24 @@ import ( ) type Manager struct { - loginPlugins []Plugin - newProxyPlugins []Plugin - closeProxyPlugins []Plugin - pingPlugins []Plugin - newWorkConnPlugins []Plugin - newUserConnPlugins []Plugin + loginPlugins []Plugin + newProxyPlugins []Plugin + closeProxyPlugins []Plugin + pingPlugins []Plugin + newWorkConnPlugins []Plugin + newUserConnPlugins []Plugin + closeUserConnPlugins []Plugin } func NewManager() *Manager { return &Manager{ - loginPlugins: make([]Plugin, 0), - newProxyPlugins: make([]Plugin, 0), - closeProxyPlugins: make([]Plugin, 0), - pingPlugins: make([]Plugin, 0), - newWorkConnPlugins: make([]Plugin, 0), - newUserConnPlugins: make([]Plugin, 0), + loginPlugins: make([]Plugin, 0), + newProxyPlugins: make([]Plugin, 0), + closeProxyPlugins: make([]Plugin, 0), + pingPlugins: make([]Plugin, 0), + newWorkConnPlugins: make([]Plugin, 0), + newUserConnPlugins: make([]Plugin, 0), + closeUserConnPlugins: make([]Plugin, 0), } } @@ -63,6 +65,9 @@ func (m *Manager) Register(p Plugin) { if p.IsSupport(OpNewUserConn) { m.newUserConnPlugins = append(m.newUserConnPlugins, p) } + if p.IsSupport(OpCloseUserConn) { + m.closeUserConnPlugins = append(m.closeUserConnPlugins, p) + } } func (m *Manager) Login(content *LoginContent) (*LoginContent, error) { @@ -259,3 +264,28 @@ func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent, } return content, nil } + +func (m *Manager) CloseUserConn(content *CloseUserConnContent) error { + if len(m.closeUserConnPlugins) == 0 { + return nil + } + + errs := make([]string, 0) + reqid, _ := util.RandID() + xl := xlog.New().AppendPrefix("reqid: " + reqid) + ctx := xlog.NewContext(context.Background(), xl) + ctx = NewReqidContext(ctx, reqid) + + for _, p := range m.closeUserConnPlugins { + _, _, err := p.Handle(ctx, OpCloseUserConn, *content) + if err != nil { + xl.Warnf("send CloseUserConn request to plugin [%s] error: %v", p.Name(), err) + errs = append(errs, fmt.Sprintf("[%s]: %v", p.Name(), err)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("send CloseUserConn request to plugin errors: %s", strings.Join(errs, "; ")) + } + return nil +} diff --git a/pkg/plugin/server/plugin.go b/pkg/plugin/server/plugin.go index 3d3c8cfdd65..4b283fe4007 100644 --- a/pkg/plugin/server/plugin.go +++ b/pkg/plugin/server/plugin.go @@ -21,12 +21,13 @@ import ( const ( APIVersion = "0.1.0" - OpLogin = "Login" - OpNewProxy = "NewProxy" - OpCloseProxy = "CloseProxy" - OpPing = "Ping" - OpNewWorkConn = "NewWorkConn" - OpNewUserConn = "NewUserConn" + OpLogin = "Login" + OpNewProxy = "NewProxy" + OpCloseProxy = "CloseProxy" + OpPing = "Ping" + OpNewWorkConn = "NewWorkConn" + OpNewUserConn = "NewUserConn" + OpCloseUserConn = "CloseUserConn" ) type Plugin interface { diff --git a/pkg/plugin/server/types.go b/pkg/plugin/server/types.go index 4a5b7527185..33b750108c9 100644 --- a/pkg/plugin/server/types.go +++ b/pkg/plugin/server/types.go @@ -69,3 +69,10 @@ type NewUserConnContent struct { ProxyType string `json:"proxy_type"` RemoteAddr string `json:"remote_addr"` } + +type CloseUserConnContent struct { + User UserInfo `json:"user"` + ProxyName string `json:"proxy_name"` + ProxyType string `json:"proxy_type"` + RemoteAddr string `json:"remote_addr"` +} diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 5b7898eb0d9..fb48de93602 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -259,6 +259,14 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { xl.Warnf("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err) return } + defer func() { + _ = rc.PluginManager.CloseUserConn(&plugin.CloseUserConnContent{ + User: content.User, + ProxyName: content.ProxyName, + ProxyType: content.ProxyType, + RemoteAddr: content.RemoteAddr, + }) + }() // try all connections from the pool workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr()) @@ -294,12 +302,13 @@ func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) { name := pxy.GetName() proxyType := cfg.Type + userRemoteAddr := userConn.RemoteAddr().String() metrics.Server.OpenConnection(name, proxyType) inCount, outCount, _ := libio.Join(local, userConn) metrics.Server.CloseConnection(name, proxyType) metrics.Server.AddTrafficIn(name, proxyType, inCount) metrics.Server.AddTrafficOut(name, proxyType, outCount) - xl.Debugf("join connections closed") + xl.Debugf("join connections closed, userConn(r[%s])", userRemoteAddr) } type Options struct { diff --git a/test/e2e/legacy/plugin/server.go b/test/e2e/legacy/plugin/server.go index b00b1bac357..d5e38f20311 100644 --- a/test/e2e/legacy/plugin/server.go +++ b/test/e2e/legacy/plugin/server.go @@ -350,6 +350,52 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() { }) }) + ginkgo.Describe("CloseUserConn", func() { + newFunc := func() *plugin.Request { + var r plugin.Request + r.Content = &plugin.CloseUserConnContent{} + return &r + } + ginkgo.It("Validate Info", func() { + localPort := f.AllocPort() + + var record string + handler := func(req *plugin.Request) *plugin.Response { + var ret plugin.Response + content := req.Content.(*plugin.CloseUserConnContent) + record = content.RemoteAddr + return &ret + } + pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil) + + f.RunServer("", pluginServer) + + serverConf := consts.LegacyDefaultServerConfig + fmt.Sprintf(` + [plugin.test] + addr = 127.0.0.1:%d + path = /handler + ops = CloseUserConn + `, localPort) + + remotePort := f.AllocPort() + clientConf := consts.LegacyDefaultClientConfig + clientConf += fmt.Sprintf(` + [tcp] + type = tcp + local_port = {{ .%s }} + remote_port = %d + `, framework.TCPEchoServerPort, remotePort) + + f.RunProcesses(serverConf, []string{clientConf}) + + framework.NewRequestExpect(f).Port(remotePort).Ensure() + + time.Sleep(1 * time.Second) + + framework.ExpectNotEqual("", record) + }) + }) + ginkgo.Describe("HTTPS Protocol", func() { newFunc := func() *plugin.Request { var r plugin.Request diff --git a/test/e2e/v1/plugin/server.go b/test/e2e/v1/plugin/server.go index fd684ef5d4e..8cabcbd5659 100644 --- a/test/e2e/v1/plugin/server.go +++ b/test/e2e/v1/plugin/server.go @@ -365,6 +365,54 @@ var _ = ginkgo.Describe("[Feature: Server-Plugins]", func() { }) }) + ginkgo.Describe("CloseUserConn", func() { + newFunc := func() *plugin.Request { + var r plugin.Request + r.Content = &plugin.CloseUserConnContent{} + return &r + } + ginkgo.It("Validate Info", func() { + localPort := f.AllocPort() + + var record string + handler := func(req *plugin.Request) *plugin.Response { + var ret plugin.Response + content := req.Content.(*plugin.CloseUserConnContent) + record = content.RemoteAddr + return &ret + } + pluginServer := pluginpkg.NewHTTPPluginServer(localPort, newFunc, handler, nil) + + f.RunServer("", pluginServer) + + serverConf := consts.DefaultServerConfig + fmt.Sprintf(` + [[httpPlugins]] + name = "test" + addr = "127.0.0.1:%d" + path = "/handler" + ops = ["CloseUserConn"] + `, localPort) + + remotePort := f.AllocPort() + clientConf := consts.DefaultClientConfig + clientConf += fmt.Sprintf(` + [[proxies]] + name = "tcp" + type = "tcp" + localPort = {{ .%s }} + remotePort = %d + `, framework.TCPEchoServerPort, remotePort) + + f.RunProcesses(serverConf, []string{clientConf}) + + framework.NewRequestExpect(f).Port(remotePort).Ensure() + + time.Sleep(1 * time.Second) + + framework.ExpectNotEqual("", record) + }) + }) + ginkgo.Describe("HTTPS Protocol", func() { newFunc := func() *plugin.Request { var r plugin.Request