searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

浅析k8s的portforward

2023-06-30 06:03:23
67
0

最近开发中,本地服务需要grpc调用开发环境的云端服务进行调试,云端服务并未开放外部端口,因此想到了 k8s 的 portforward 功能。此前也只是在书本或文章中获知有此功能,未深入了解和使用。趁着这次机会,对 portforward 进行了一番研究,与大家分析一下自己的历程。

portforward 的调用流程大概如下:

首先会在本地通过kubectl开启一个使用spdy协议的 reset server,把本地的请求发往k8s对应的pod的子资源 portforward。

func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
	// 此处省略无关代码
        // ...
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
	// 此处省略无关代码
        // ...
	return fw.ForwardPorts()
}
// 此处省略无关代码
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
	pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
	// 此处省略无关代码
        // ...
	req := o.RESTClient.Post().
		Resource("pods").
		Namespace(o.Namespace).
		Name(pod.Name).
		SubResource("portforward")
	return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}

注:kubectl 源码片断(kubectl/pkg/cmd/portforward/portforward.go)

请求到了k8s api server 端,

// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
	//...
	location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
	//...
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
	return handler
}

注:k8s 源码片断(kubectl/pkg/cmd/portforward/portforward.go)

在kubelet server中监听/portForward,找到对应的pod,将请求代理给它。

// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
	params := getPortForwardRequestParams(request)
	portForwardOptions, err := portforward.NewV4Options(request.Request)
	//...
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	//...
	url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
	//...
	proxyStream(response.ResponseWriter, request.Request, url)
}

注:k8s 源码片断(kubernetes/pkg/kubelet/server/server.go)

同时,容器运行时cri要实现PortForward接口,到此,就将本地的请求转发到了目标pod了。

// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
	// PortForwarder copies data between a data stream and a port in a pod.
	PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}

// ServePortForward handles a port forwarding request.  A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
	var err error
	if wsstream.IsWebSocketRequest(req) {
		err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
	} else {
		err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
	}

	if err != nil {
		runtime.HandleError(err)
		return
	}
}

注:k8s 源码片断(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)

此至,整个portforward的流程代码在心里有了一个清晰的调用链流程了。

0条评论
0 / 1000
杜****中
4文章数
0粉丝数
杜****中
4 文章 | 0 粉丝
杜****中
4文章数
0粉丝数
杜****中
4 文章 | 0 粉丝
原创

浅析k8s的portforward

2023-06-30 06:03:23
67
0

最近开发中,本地服务需要grpc调用开发环境的云端服务进行调试,云端服务并未开放外部端口,因此想到了 k8s 的 portforward 功能。此前也只是在书本或文章中获知有此功能,未深入了解和使用。趁着这次机会,对 portforward 进行了一番研究,与大家分析一下自己的历程。

portforward 的调用流程大概如下:

首先会在本地通过kubectl开启一个使用spdy协议的 reset server,把本地的请求发往k8s对应的pod的子资源 portforward。

func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
	// 此处省略无关代码
        // ...
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
	// 此处省略无关代码
        // ...
	return fw.ForwardPorts()
}
// 此处省略无关代码
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
	pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
	// 此处省略无关代码
        // ...
	req := o.RESTClient.Post().
		Resource("pods").
		Namespace(o.Namespace).
		Name(pod.Name).
		SubResource("portforward")
	return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}

注:kubectl 源码片断(kubectl/pkg/cmd/portforward/portforward.go)

请求到了k8s api server 端,

// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
	//...
	location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
	//...
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
	return handler
}

注:k8s 源码片断(kubectl/pkg/cmd/portforward/portforward.go)

在kubelet server中监听/portForward,找到对应的pod,将请求代理给它。

// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
	params := getPortForwardRequestParams(request)
	portForwardOptions, err := portforward.NewV4Options(request.Request)
	//...
	pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
	//...
	url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
	//...
	proxyStream(response.ResponseWriter, request.Request, url)
}

注:k8s 源码片断(kubernetes/pkg/kubelet/server/server.go)

同时,容器运行时cri要实现PortForward接口,到此,就将本地的请求转发到了目标pod了。

// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
	// PortForwarder copies data between a data stream and a port in a pod.
	PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}

// ServePortForward handles a port forwarding request.  A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
	var err error
	if wsstream.IsWebSocketRequest(req) {
		err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
	} else {
		err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
	}

	if err != nil {
		runtime.HandleError(err)
		return
	}
}

注:k8s 源码片断(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)

此至,整个portforward的流程代码在心里有了一个清晰的调用链流程了。

文章来自个人专栏
GordonDu个人专栏
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0