最近开发中,本地服务需要grpc调用开发环境的云端服务进行调试,云端服务并未开放外部端口,因此想到了 k8s 的 portforward 功能。此前也只是在书本或文章中获知有此功能,未深入了解和使用。趁着这次机会,对 portforward 进行了一番研究,与大家分析一下自己的历程。
portforward 的调用流程大概如下:
首先会在本地通过kubectl开启一个使用spdy协议的 reset server,把本地的请求发往k8s对应的pod的子资源 portforward。
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
// 此处省略无关代码
// ...
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
// 此处省略无关代码
// ...
return fw.ForwardPorts()
}
// 此处省略无关代码
// ...
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.PodClient.Pods(o.Namespace).Get(context.TODO(), o.PodName, metav1.GetOptions{})
// 此处省略无关代码
// ...
req := o.RESTClient.Post().
Resource("pods").
Namespace(o.Namespace).
Name(pod.Name).
SubResource("portforward")
return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
}
注:kubectl 源码片断(kubectl/pkg/cmd/portforward/portforward.go)
请求到了k8s api server 端,
// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
//...
location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
//...
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *proxy.UpgradeAwareHandler {
handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}
注:k8s 源码片断(kubectl/pkg/cmd/portforward/portforward.go)
在kubelet server中监听/portForward,找到对应的pod,将请求代理给它。
// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
params := getPortForwardRequestParams(request)
portForwardOptions, err := portforward.NewV4Options(request.Request)
//...
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
//...
url, err := s.host.GetPortForward(request.Request.Context(), pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
//...
proxyStream(response.ResponseWriter, request.Request, url)
}
注:k8s 源码片断(kubernetes/pkg/kubelet/server/server.go)
同时,容器运行时cri要实现PortForward接口,到此,就将本地的请求转发到了目标pod了。
// PortForwarder knows how to forward content from a data stream to/from a port in a pod.
type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod.
PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
}
// ServePortForward handles a port forwarding request. A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
var err error
if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else {
err = handleHTTPStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
}
if err != nil {
runtime.HandleError(err)
return
}
}
注:k8s 源码片断(kubernetes/pkg/kubelet/cri/streaming/portforward/portforward.go)
此至,整个portforward的流程代码在心里有了一个清晰的调用链流程了。