searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

envoy源码分析:线程模型与沙箱机制

2024-07-05 09:56:43
158
0

线程模型

envoy是一个多线程模型,参考官方的说法,envoy会起一个main thread主线程,主要负责做任务调度,统计汇总等。

同时envoy会根据核数起多个worker线程,主要负责具体的业务,诸如监听网络事件,进行连接管理等。

示意图如下:

我们可以看到在这个分工下面,必然面临的一个问题就是,主线程如何跟worker线程进行内存交互。比方说主线程发现DNS或者cluster信息变化了,怎么同步到各个工作线程之中,这就涉及我们接下来要了解的TLS机制。

TLS (thread local storage) 机制

概述

首先我们来看官方给出来的图

 

大致可以看得出来,主线程更新了内存对象之后,是会同步到worker线程的。但是具体细节如何,我们还不太清楚。在阅读官方的文档中,有这么一句话,也让我充满了疑惑:

 

这个worker线程的静默期是怎么回事?TLS机制是异步的吗?需不需要加锁?

代码分析

简单的看threadlocal对象会提供一个初始化的callback,在所有worker线程获取到的时候进行初始化,还有提供一个runOnAllThreads的函数,执行Update callback。

 

 

1.   worker线程的注册和主线程任务派发

worker线程在启动的时候会把自己的dispatcher注册到tls管理器里面,后续主线程更新则会通过runOnAllThreads把更新函数放到每个worker dispatcher里面。

void InstanceImpl::runOnAllThreads(std::function<void()> cb) {
   ASSERT_IS_MAIN_OR_TEST_THREAD();
   ASSERT(!shutdown_);
 
 //registered_threads_就是前面worker注册的dispatcher,这部分代码就不贴出来了
   for (Event::Dispatcher& dispatcher : registered_threads_) {
     dispatcher.post(cb);
   }
 
   // Handle main thread.
   cb();
 }
 

注意入参cb不是const 引用或者右值引用,所以这里的cb都是临时对象。下面的post同理。

2. worker线程处理主线程post过来的更新事件

dispatcher底层是一个eventloop,各个worker线程自己维护一个callback队列。接受主线程的post事件其实就是往队列里面加就可以了。

void DispatcherImpl::post(PostCb callback) {
   bool do_post;
   {
     Thread::LockGuard lock(post_lock_);
     do_post = post_callbacks_.empty();
     post_callbacks_.push_back(std::move(callback));
   }
 
   if (do_post) {
     post_cb_->scheduleCallbackCurrentIteration();
   }
 }
 

上文说到callback作为临时对象,move调用的是移动构造函数,是可以少一次内存复制的。

scheduleCallbackCurrentIteration内部是往libevent里面插队,让libevent优先消费callback队列,只有为空的时候通知一次,因为无论如何后续消费都是消费到队列为空,无需重复通知,具体可以看下面的代码。

 

3.   worker消费callback事件

worker启动后放一个消费任务到event里面排队。当worker处理完自己的业务后,eventloop轮到他执行消费callback队列时,会把所有的callback消费掉。

消费的函数就是前面提到的的init或者update。

消费callback队列的启动器在构造函数里面:

消费所有callback队列的内容

void DispatcherImpl::runPostCallbacks() {
   // Clear the deferred delete list before running post callbacks to reduce non-determinism in
   // callback processing, and more easily detect if a scheduled post callback refers to one of the
   // objects that is being deferred deleted.
   clearDeferredDeleteList();
 
   std::list<PostCb> callbacks;
   {
     // Take ownership of the callbacks under the post_lock_. The lock must be released before
     // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
     // later in the event loop.
     Thread::LockGuard lock(post_lock_);
     callbacks = std::move(post_callbacks_);
     // post_callbacks_ should be empty after the move.
     ASSERT(post_callbacks_.empty());
   }
   // It is important that the execution and deletion of the callback happen while post_lock_ is not
   // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
   while (!callbacks.empty()) {
     // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
     // executing a long list of callbacks.
     touchWatchdog();
     // Run the callback.
     callbacks.front()();
     // Pop the front so that the destructor of the callback that just executed runs before the next
     // callback executes.
     callbacks.pop_front();
   }
 }
 

callbacks的获取也是通过std::move,一次过获取list所有内容并把原post_callbacks_置空。

随后通过front()来消费,直到排空队列。

通过上面代码可以看出来,envoy这部分设计的很巧妙,主线程的对象复制是通过调用函数入参来产生临时对象,而dispatcher的分发和消费全部通过std::move来移交使用权,这个过程中无论编译器有没有做优化,都能保证尽量少的产生内存拷贝。

有一点点小的优化空间,push_back函数应该可以改用emplace_back,少一次移动构造函数的消耗,不过这个性能影响很小。

小结

整体步骤如下:

 

至此,我们大部分疑惑都可以解开了:

  1. 内存对象广播下去的是shared指针,计数归零的时候会释放内存。
  2. callback队列还是要加锁的,但是这个是每个线程自己与主线程的竞争。粒度小很多。
  3. 所谓的quiescent period就是两次业务之间eventloop的间隔,消费callback队列本身就作为一个event放进去了。

WASM机制

背景介绍

WebAssembly是一种沙盒技术,可用于扩展Istio代理(Envoy)。Proxy-Wasm沙箱API取代了Mixer作为Istio中的主要扩展机制。

envoy的wasm机制支持我们通过其他编程语言编写插件,进行自定义的逻辑处理,并安全的调用在envoy内部。

流量标签目前就是侵入到envoy的http filter之中进行header的缓存与改写。

流程

请求 -> FilterChain  ---> upstream

                   |----> WASM在这里,是filterChain的一部分

 

源码分析

wasm入口

FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Wasm& config,
                            Server::Configuration::FactoryContext& context)
     : tls_slot_(ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
           context.threadLocal())) {
   const auto plugin = std::make_shared<Common::Wasm::Plugin>(
       config.config(), context.direction(), context.localInfo(), &context.listenerMetadata());
 
   auto callback = [plugin, this](const Common::Wasm::WasmHandleSharedPtr& base_wasm) {
     // NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
     tls_slot_->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
       return std::make_shared<PluginHandleSharedPtrThreadLocal>(
           Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
     });
   };
 
   if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), context.clusterManager(),
                                 context.initManager(), context.mainThreadDispatcher(),
                                 context.api(), context.lifecycleNotifier(), remote_data_provider_,
                                 std::move(callback))) {
     throw Common::Wasm::WasmException(
         fmt::format("Unable to create Wasm HTTP filter {}", plugin->name_));
   }
 }
 

 

1.  创建虚拟机

createWasm 就是创建wasm虚拟机。

Common:CreateWasm代码如下:

bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
                 Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
                 Event::Dispatcher& dispatcher, Api::Api& api,
                 Server::ServerLifecycleNotifier& lifecycle_notifier,
                 Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
                 CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) {
   //省略
   if (vm_config.code().has_remote()) {
   //省略
   } else if (vm_config.code().has_local()) {
     code = Config::DataSource::read(vm_config.code().local(), true, api);
     source = Config::DataSource::getPath(vm_config.code().local())
                  .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
   }
 
   auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(),
                                       MessageUtil::anyToBytes(vm_config.configuration()), code);
   auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
                       &lifecycle_notifier, create_root_context_for_testing,
                       &stats_handler](std::string code) -> bool {
     if (code.empty()) {
       cb(nullptr);
       return false;
     }
 
     auto config = plugin->wasmConfig();
     auto wasm = proxy_wasm::createWasm(
         vm_key, code, plugin,
         getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
         getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
         config.config().vm_config().allow_precompiled());
     Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
     stats_handler.onEvent(toWasmEvent(wasm));
     if (!wasm || wasm->wasm()->isFailed()) {
       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
                           "Unable to create Wasm");
       cb(nullptr);
       return false;
     }
     cb(std::static_pointer_cast<WasmHandle>(wasm));
     return true;
   };
 
   if (fetch) {
   //省略
   } else {
     return complete_cb(code);
   }
   return true;
 }
 
 

核心代码花里胡哨一大堆,最后调用到了complete_cb,里面就2件事:

1.1.  创建vm_key
  1. vm_id:就是vm_config里面定义的vm_id
  2. vm_key:vm_id+config+code

std::string makeVmKey(std::string_view vm_id, std::string_view vm_configuration,
                       std::string_view code) {
   return Sha256String({vm_id, "||", vm_configuration, "||", code});
 }
 

1.2.调用 proxy_wasm::createWasm

proxy_wasm 是引用的一个外部项目:proxy-wasm/proxy-wasm-cpp-host/blob/master/src/wasm.cc

是真正的创建沙箱的逻辑。

std::shared_ptr<WasmHandleBase> createWasm(const std::string &vm_key, const std::string &code,
                                            const std::shared_ptr<PluginBase> &plugin,
                                            const WasmHandleFactory &factory,
                                            const WasmHandleCloneFactory &clone_factory,
                                            bool allow_precompiled) {
   std::shared_ptr<WasmHandleBase> wasm_handle;
   {
     std::lock_guard<std::mutex> guard(base_wasms_mutex);
     if (base_wasms == nullptr) {
       base_wasms = new std::remove_reference<decltype(*base_wasms)>::type;
     }
     auto it = base_wasms->find(vm_key);
     if (it != base_wasms->end()) {
       wasm_handle = it->second.lock();
       if (!wasm_handle) {
         base_wasms->erase(it);
       }
     }
     if (!wasm_handle) {
       // If no cached base_wasm, creates a new base_wasm, loads the code and initializes it.
       wasm_handle = factory(vm_key);
       if (!wasm_handle) {
         return nullptr;
       }
       if (!wasm_handle->wasm()->load(code, allow_precompiled)) {
         wasm_handle->wasm()->fail(FailState::UnableToInitializeCode, "Failed to load Wasm code");
         return nullptr;
       }
       if (!wasm_handle->wasm()->initialize()) {
         wasm_handle->wasm()->fail(FailState::UnableToInitializeCode,
                                   "Failed to initialize Wasm code");
         return nullptr;
       }
       (*base_wasms)[vm_key] = wasm_handle;
     }
   }
 
   // Either creating new one or reusing the existing one, apply canary for each plugin.
   if (!wasm_handle->canary(plugin, clone_factory)) {
     return nullptr;
   }
   return wasm_handle;
 };
 

里面先是本地查找有没有对应的沙箱,没有的话调用factory新建一个,然后依次调用load,initialize等。

initialize会初始化wasm_vm_(依赖配置中的runtime,貌似现在用的是v8),然后注册一些方法,这些都是具体虚拟机映射api的动作了,暂时略过。

2.  回调cb函数

对应的就是上一段调用端的getOrCreateThreadLocalPlugin,这个后续再展开。

这里只关心callback调用时机。是在入口主线程,这个base_wasm会在callback的时候给每个子线程创建, 代码如下:

  /**
    * Set thread local data on all threads previously registered via registerThread().
    * @param initializeCb supplies the functor that will be called *on each thread*. The functor
    *                     returns the thread local object which is then stored. The storage is via
    *                     a shared_ptr. Thus, this is a flexible mechanism that can be used to share
    *                     the same data across all threads or to share different data on each thread.
    *
    * NOTE: The initialize callback is not supposed to capture the Slot, or its owner, as the owner
    * may be destructed in main thread before the update_cb gets called in a worker thread.
    */
   using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>;
   virtual void set(InitializeCb cb) PURE;
 

实现函数为getOrCreateThreadLocalPlugin:

PluginHandleSharedPtr
 getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
                              Event::Dispatcher& dispatcher,
                              CreateContextFn create_root_context_for_testing) {
   if (!base_wasm) {
     if (!plugin->fail_open_) {
       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
                           "Plugin configured to fail closed failed to load");
     }
     // To handle the case when failed to create VMs and fail-open/close properly,
     // we still create PluginHandle with null WasmBase.
     return std::make_shared<PluginHandle>(nullptr, plugin);
   }
   return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
       std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
       getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
       getPluginHandleFactory()));
 }
 

这部分代码又依赖到了proxy_wasm项目

关键代码如下:

std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
     const std::shared_ptr<WasmHandleBase> &base_handle, const std::shared_ptr<PluginBase> &plugin,
     const WasmHandleCloneFactory &clone_factory, const PluginHandleFactory &plugin_factory) {
   std::string key(std::string(base_handle->wasm()->vm_key()) + "||" + plugin->key());
   // Get existing thread-local Plugin handle.
   auto it = local_plugins.find(key);
   if (it != local_plugins.end()) {
     auto plugin_handle = it->second.lock();
     if (plugin_handle) {
       return plugin_handle;
     }
     local_plugins.erase(it);
   }
   removeStaleLocalCacheEntries(local_plugins, local_plugins_keys);
   // Get thread-local WasmVM.
   auto wasm_handle = getOrCreateThreadLocalWasm(base_handle, clone_factory);
   if (!wasm_handle) {
     return nullptr;
   }
   // Create and initialize new thread-local Plugin.
   auto *plugin_context = wasm_handle->wasm()->start(plugin);
   if (plugin_context == nullptr) {
     base_handle->wasm()->fail(FailState::StartFailed, "Failed to start thread-local Wasm");
     return nullptr;
   }
   if (!wasm_handle->wasm()->configure(plugin_context, plugin)) {
     base_handle->wasm()->fail(FailState::ConfigureFailed,
                               "Failed to configure thread-local Wasm plugin");
     return nullptr;
   }
   auto plugin_handle = plugin_factory(wasm_handle, plugin);
   cacheLocalPlugin(key, plugin_handle);
   wasm_handle->wasm()->wasm_vm()->addFailCallback([key](proxy_wasm::FailState fail_state) {
     if (fail_state == proxy_wasm::FailState::RuntimeError) {
       // If VM failed, erase the entry so that:
       // 1) we can recreate the new thread local plugin from the same base_wasm.
       // 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
       local_plugins.erase(key);
     };
   });
   return plugin_handle;
 }
 

这段代码做了几件事情:

  1. 按照vm_key查找本地有没有对应的plugin对象,有的话直接用。
  2. 没有的话调用factory函数,创建一个。

至此,基本可以搞清楚整个vm的创建过程了,主线程先创建沙箱,然后通过TLS机制给到各个worker线程再建本地对象。而这些过程中,vm_key会作为一个缓存key,多次使用同样的代码和vm_id是可以复用线程内已有的沙箱的。

小结

沙箱机制大致示意图如下

特殊地,在主线程中也有沙箱,主线程的沙箱是一个wasmservice类,用户也能编写插件,如果指定singleton为true则复用该句柄。

插入worker插件可以任意指定filter,如HTTP_FILTER。插入主线程则指定为BOOTSTRAP。

共享数据

关键结构体:Context, SharedData

1.  Context

顾名思义就是上下文。

有以下几种:

ContextBase(WasmBase *wasm);                                            // Vm Context.
ContextBase(WasmBase *wasm, const std::shared_ptr<PluginBase> &plugin); // Root Context.
ContextBase(WasmBase *wasm, uint32_t parent_context_id,
               const std::shared_ptr<PluginHandleBase> &plugin_handle); // Stream context.
 

一个VMContext对应创建多个PluginContext,PluginContext再分别给每个请求创建context。Context里面可以进行一些变量的传递。但是需要注意的是,每种context都有自己的作用范围。

官方的描述大概如下:

 

wasm本身还会由envoy放入一些属性,可以给我们上下文使用的。

Wasm attributes

In addition to all above, the following extra attributes are available to Wasm extensions:

Attribute

Type

Description

plugin_name

string

Plugin name

plugin_root_id

string

Plugin root ID

plugin_vm_id

string

Plugin VM ID

node

Node

Local node description

cluster_name

string

Upstream cluster name

cluster_metadata

Metadata

Upstream cluster metadata

listener_direction

int

Enumeration value of the listener traffic direction

listener_metadata

Metadata

Listener metadata

route_name

string

Route name

route_metadata

Metadata

Route metadata

upstream_host_metadata

Metadata

Upstream host metadata

listener_diretion需要使用int64来解析,参考代码如下:

property, err := proxywasm.GetProperty([]string{"listener_direction"})
if err != nil {
proxywasm.LogErrorf("GetProperty listener_direction fail err %v try xds.listener_direction", err)
property, err = proxywasm.GetProperty([]string{"xds", "listener_direction"})
if err != nil {
proxywasm.LogErrorf("GetProperty xds.listener_direction failed %v", err)
return
}
}
d := binary.LittleEndian.Uint64(property)
//UNSPECIFIED
//(DEFAULT) Default option is unspecified.
//
//INBOUND
//The transport is used for incoming traffic.
//
//OUTBOUND
//The transport is used for outgoing traffic.
if d == 2 {
ctx.Outbound = true
}

2.  SharedData

由于context的内存隔离,在不同线程里面同步数据是需要其他特殊机制的,SharedData是目前可用的一种方式。

定义

// Shared Data
 WasmResult ContextBase::getSharedData(std::string_view key,
                                       std::pair<std::string, uint32_t> *data) {
   return getGlobalSharedData().get(wasm_->vm_id(), key, data);
 }
 
 WasmResult ContextBase::setSharedData(std::string_view key, std::string_view value, uint32_t cas) {
   return getGlobalSharedData().set(wasm_->vm_id(), key, value, cas);
 }
 
 SharedData &getGlobalSharedData() {
   static auto *ptr = new SharedData;
   return *ptr;
 };
 

可以看到每个上下文,都可以使用sharedData,getGlobalSharedData内部是一个static指针,指向一个块SharedData,表示这个是一个全局静态变量。而且没有析构函数,可以近似认为这个是单例的一个内存空间。

注:虽然没有析构函数,但是每个vm创建后还是会注册一个函数给SharedData,传入vmid,在vm析构的过程中,会先调用SharedData的deleteByVmId把内存降下来。

关键函数是registerVmIdHandleCallback和~VmIdHandle()。VmIdHandle为WasmBase里面的一个shared_ptr智能指针,引用数为0后会析构。这部分代码不展开讲了,有兴趣的同学自己上github看。

SharedData分析

SharedData按vmid分map。每个map里面value为另一个unordered_map。这个unordered_map才是真正的共享数据,value为data+数据版本号(cas)。

  // TODO: use std::shared_mutex in C++17.
   std::mutex mutex_;
   uint32_t cas_ = 1;
   std::map<std::string, std::unordered_map<std::string, std::pair<std::string, uint32_t>>> data_;
 

读写时的cas

  1. 目前还是用的mutex,这个是个很重的锁,但是有意改为使用shared_mutex。
  2. 当读写锁分开后,写操作需要利用数据版本号控制。

读接口把数据和版本号cas一起返回,调用端如果需要把数据写回的话,会判断cas是否为最新,如果不是最新,则表示数据被更新过,这样需要调用端自行重试。

读操作就不看了,简单看一些写操作:

WasmResult SharedData::set(std::string_view vm_id, std::string_view key, std::string_view value,
                            uint32_t cas) {
   std::lock_guard<std::mutex> lock(mutex_);
   std::unordered_map<std::string, std::pair<std::string, uint32_t>> *map;
   auto map_it = data_.find(std::string(vm_id));
   if (map_it == data_.end()) {
     map = &data_[std::string(vm_id)];
   } else {
     map = &map_it->second;
   }
   auto it = map->find(std::string(key));
   if (it != map->end()) {
     if (cas != 0U && cas != it->second.second) {
       return WasmResult::CasMismatch;
     }
     it->second = std::make_pair(std::string(value), nextCas());
   } else {
     map->emplace(key, std::make_pair(std::string(value), nextCas()));
   }
   return WasmResult::Ok;
 }
 

nextCas还是一个全局的计数器,是一个软CAS,甚至没有vmid隔离,感觉还有优化空间。锁粒度未全局级,也有优化空间。

在Context中使用

WasmResult ContextBase::setSharedData(std::string_view key, std::string_view value, uint32_t cas) {
   return getGlobalSharedData().set(wasm_->vm_id(), key, value, cas);
 }
 

使用比较简单,直接get/set就行了,这里屏蔽了vm_id,就是说SharedData在插件编写者的角度来看,只能读写自己沙箱的数据,但是可以多线程共享。

如何使用沙箱

c++为例

c++sdk给出两个Context的工厂函数,我们可以自行创建子类来实现,里面有很多的虚函数,我们按需来继承就好。

他们分别是RootContext和Context。对应的就是Plugin级别的Context和Stream级别Context的。

sdk中给出的ContextBase虽然跟vm级别的Context同名,但是这仅仅是sdk给RootContext的普通Context的一个公共父类,重名只是巧合。我们是没有办法操作vm级别的Context的。

从前文的代码分析也可以看出来,Plugin是每个线程通过tls机制初始化的,他们和主线程之间内存隔离,线程之间也相互隔离。

沙箱的函数隔离

wasm的沙箱在启动的时候会注册一个函数列表,后续这个函数列表跟沙箱外的函数列表一一映射,在调用时会找到相应的函数运行。

envoy的函数注册

void Wasm::registerCallbacks() {
   WasmBase::registerCallbacks();
 #define _REGISTER(_fn)                                                                             \
   wasm_vm_->registerCallback(                                                                      \
       "env", "envoy_" #_fn, &_fn,                                                                  \
       &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
   _REGISTER(resolve_dns);
 #undef _REGISTER
 }
 

proxy_wasm的函数注册

#define _REGISTER_PROXY(_fn) _REGISTER("env", "proxy_", , _fn)
   FOR_ALL_HOST_FUNCTIONS(_REGISTER_PROXY);
 
   if (abiVersion() == AbiVersion::ProxyWasm_0_1_0) {
     _REGISTER_PROXY(get_configuration);
     _REGISTER_PROXY(continue_request);
     _REGISTER_PROXY(continue_response);
     _REGISTER_PROXY(clear_route_cache);
   } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_0) {
     _REGISTER_PROXY(continue_stream);
     _REGISTER_PROXY(close_stream);
   } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_1) {
     _REGISTER_PROXY(continue_stream);
     _REGISTER_PROXY(close_stream);
     _REGISTER_PROXY(get_log_level);
   }
 #undef _REGISTER_PROXY
 
 //其中FOR_ALL_HOST_FUNCTIONS包括以下函数
 
 #define FOR_ALL_HOST_FUNCTIONS(_f)                                                                 \
   _f(log) _f(get_status) _f(set_property) _f(get_property) _f(send_local_response)                 \
       _f(get_shared_data) _f(set_shared_data) _f(register_shared_queue) _f(resolve_shared_queue)   \
           _f(dequeue_shared_queue) _f(enqueue_shared_queue) _f(get_header_map_value)               \
               _f(add_header_map_value) _f(replace_header_map_value) _f(remove_header_map_value)    \
                   _f(get_header_map_pairs) _f(set_header_map_pairs) _f(get_header_map_size)        \
                       _f(get_buffer_status) _f(get_buffer_bytes) _f(set_buffer_bytes)              \
                           _f(http_call) _f(grpc_call) _f(grpc_stream) _f(grpc_close)               \
                               _f(grpc_cancel) _f(grpc_send) _f(set_tick_period_milliseconds)       \
                                   _f(get_current_time_nanoseconds) _f(define_metric)               \
                                       _f(increment_metric) _f(record_metric) _f(get_metric)        \
                                           _f(set_effective_context) _f(done)                       \
                                               _f(call_foreign_function)

 

0条评论
0 / 1000
g****m
3文章数
0粉丝数
g****m
3 文章 | 0 粉丝
g****m
3文章数
0粉丝数
g****m
3 文章 | 0 粉丝
原创

envoy源码分析:线程模型与沙箱机制

2024-07-05 09:56:43
158
0

线程模型

envoy是一个多线程模型,参考官方的说法,envoy会起一个main thread主线程,主要负责做任务调度,统计汇总等。

同时envoy会根据核数起多个worker线程,主要负责具体的业务,诸如监听网络事件,进行连接管理等。

示意图如下:

我们可以看到在这个分工下面,必然面临的一个问题就是,主线程如何跟worker线程进行内存交互。比方说主线程发现DNS或者cluster信息变化了,怎么同步到各个工作线程之中,这就涉及我们接下来要了解的TLS机制。

TLS (thread local storage) 机制

概述

首先我们来看官方给出来的图

 

大致可以看得出来,主线程更新了内存对象之后,是会同步到worker线程的。但是具体细节如何,我们还不太清楚。在阅读官方的文档中,有这么一句话,也让我充满了疑惑:

 

这个worker线程的静默期是怎么回事?TLS机制是异步的吗?需不需要加锁?

代码分析

简单的看threadlocal对象会提供一个初始化的callback,在所有worker线程获取到的时候进行初始化,还有提供一个runOnAllThreads的函数,执行Update callback。

 

 

1.   worker线程的注册和主线程任务派发

worker线程在启动的时候会把自己的dispatcher注册到tls管理器里面,后续主线程更新则会通过runOnAllThreads把更新函数放到每个worker dispatcher里面。

void InstanceImpl::runOnAllThreads(std::function<void()> cb) {
   ASSERT_IS_MAIN_OR_TEST_THREAD();
   ASSERT(!shutdown_);
 
 //registered_threads_就是前面worker注册的dispatcher,这部分代码就不贴出来了
   for (Event::Dispatcher& dispatcher : registered_threads_) {
     dispatcher.post(cb);
   }
 
   // Handle main thread.
   cb();
 }
 

注意入参cb不是const 引用或者右值引用,所以这里的cb都是临时对象。下面的post同理。

2. worker线程处理主线程post过来的更新事件

dispatcher底层是一个eventloop,各个worker线程自己维护一个callback队列。接受主线程的post事件其实就是往队列里面加就可以了。

void DispatcherImpl::post(PostCb callback) {
   bool do_post;
   {
     Thread::LockGuard lock(post_lock_);
     do_post = post_callbacks_.empty();
     post_callbacks_.push_back(std::move(callback));
   }
 
   if (do_post) {
     post_cb_->scheduleCallbackCurrentIteration();
   }
 }
 

上文说到callback作为临时对象,move调用的是移动构造函数,是可以少一次内存复制的。

scheduleCallbackCurrentIteration内部是往libevent里面插队,让libevent优先消费callback队列,只有为空的时候通知一次,因为无论如何后续消费都是消费到队列为空,无需重复通知,具体可以看下面的代码。

 

3.   worker消费callback事件

worker启动后放一个消费任务到event里面排队。当worker处理完自己的业务后,eventloop轮到他执行消费callback队列时,会把所有的callback消费掉。

消费的函数就是前面提到的的init或者update。

消费callback队列的启动器在构造函数里面:

消费所有callback队列的内容

void DispatcherImpl::runPostCallbacks() {
   // Clear the deferred delete list before running post callbacks to reduce non-determinism in
   // callback processing, and more easily detect if a scheduled post callback refers to one of the
   // objects that is being deferred deleted.
   clearDeferredDeleteList();
 
   std::list<PostCb> callbacks;
   {
     // Take ownership of the callbacks under the post_lock_. The lock must be released before
     // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
     // later in the event loop.
     Thread::LockGuard lock(post_lock_);
     callbacks = std::move(post_callbacks_);
     // post_callbacks_ should be empty after the move.
     ASSERT(post_callbacks_.empty());
   }
   // It is important that the execution and deletion of the callback happen while post_lock_ is not
   // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
   while (!callbacks.empty()) {
     // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
     // executing a long list of callbacks.
     touchWatchdog();
     // Run the callback.
     callbacks.front()();
     // Pop the front so that the destructor of the callback that just executed runs before the next
     // callback executes.
     callbacks.pop_front();
   }
 }
 

callbacks的获取也是通过std::move,一次过获取list所有内容并把原post_callbacks_置空。

随后通过front()来消费,直到排空队列。

通过上面代码可以看出来,envoy这部分设计的很巧妙,主线程的对象复制是通过调用函数入参来产生临时对象,而dispatcher的分发和消费全部通过std::move来移交使用权,这个过程中无论编译器有没有做优化,都能保证尽量少的产生内存拷贝。

有一点点小的优化空间,push_back函数应该可以改用emplace_back,少一次移动构造函数的消耗,不过这个性能影响很小。

小结

整体步骤如下:

 

至此,我们大部分疑惑都可以解开了:

  1. 内存对象广播下去的是shared指针,计数归零的时候会释放内存。
  2. callback队列还是要加锁的,但是这个是每个线程自己与主线程的竞争。粒度小很多。
  3. 所谓的quiescent period就是两次业务之间eventloop的间隔,消费callback队列本身就作为一个event放进去了。

WASM机制

背景介绍

WebAssembly是一种沙盒技术,可用于扩展Istio代理(Envoy)。Proxy-Wasm沙箱API取代了Mixer作为Istio中的主要扩展机制。

envoy的wasm机制支持我们通过其他编程语言编写插件,进行自定义的逻辑处理,并安全的调用在envoy内部。

流量标签目前就是侵入到envoy的http filter之中进行header的缓存与改写。

流程

请求 -> FilterChain  ---> upstream

                   |----> WASM在这里,是filterChain的一部分

 

源码分析

wasm入口

FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Wasm& config,
                            Server::Configuration::FactoryContext& context)
     : tls_slot_(ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
           context.threadLocal())) {
   const auto plugin = std::make_shared<Common::Wasm::Plugin>(
       config.config(), context.direction(), context.localInfo(), &context.listenerMetadata());
 
   auto callback = [plugin, this](const Common::Wasm::WasmHandleSharedPtr& base_wasm) {
     // NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
     tls_slot_->set([base_wasm, plugin](Event::Dispatcher& dispatcher) {
       return std::make_shared<PluginHandleSharedPtrThreadLocal>(
           Common::Wasm::getOrCreateThreadLocalPlugin(base_wasm, plugin, dispatcher));
     });
   };
 
   if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), context.clusterManager(),
                                 context.initManager(), context.mainThreadDispatcher(),
                                 context.api(), context.lifecycleNotifier(), remote_data_provider_,
                                 std::move(callback))) {
     throw Common::Wasm::WasmException(
         fmt::format("Unable to create Wasm HTTP filter {}", plugin->name_));
   }
 }
 

 

1.  创建虚拟机

createWasm 就是创建wasm虚拟机。

Common:CreateWasm代码如下:

bool createWasm(const PluginSharedPtr& plugin, const Stats::ScopeSharedPtr& scope,
                 Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
                 Event::Dispatcher& dispatcher, Api::Api& api,
                 Server::ServerLifecycleNotifier& lifecycle_notifier,
                 Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
                 CreateWasmCallback&& cb, CreateContextFn create_root_context_for_testing) {
   //省略
   if (vm_config.code().has_remote()) {
   //省略
   } else if (vm_config.code().has_local()) {
     code = Config::DataSource::read(vm_config.code().local(), true, api);
     source = Config::DataSource::getPath(vm_config.code().local())
                  .value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
   }
 
   auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(),
                                       MessageUtil::anyToBytes(vm_config.configuration()), code);
   auto complete_cb = [cb, vm_key, plugin, scope, &api, &cluster_manager, &dispatcher,
                       &lifecycle_notifier, create_root_context_for_testing,
                       &stats_handler](std::string code) -> bool {
     if (code.empty()) {
       cb(nullptr);
       return false;
     }
 
     auto config = plugin->wasmConfig();
     auto wasm = proxy_wasm::createWasm(
         vm_key, code, plugin,
         getWasmHandleFactory(config, scope, api, cluster_manager, dispatcher, lifecycle_notifier),
         getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
         config.config().vm_config().allow_precompiled());
     Stats::ScopeSharedPtr create_wasm_stats_scope = stats_handler.lockAndCreateStats(scope);
     stats_handler.onEvent(toWasmEvent(wasm));
     if (!wasm || wasm->wasm()->isFailed()) {
       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), trace,
                           "Unable to create Wasm");
       cb(nullptr);
       return false;
     }
     cb(std::static_pointer_cast<WasmHandle>(wasm));
     return true;
   };
 
   if (fetch) {
   //省略
   } else {
     return complete_cb(code);
   }
   return true;
 }
 
 

核心代码花里胡哨一大堆,最后调用到了complete_cb,里面就2件事:

1.1.  创建vm_key
  1. vm_id:就是vm_config里面定义的vm_id
  2. vm_key:vm_id+config+code

std::string makeVmKey(std::string_view vm_id, std::string_view vm_configuration,
                       std::string_view code) {
   return Sha256String({vm_id, "||", vm_configuration, "||", code});
 }
 

1.2.调用 proxy_wasm::createWasm

proxy_wasm 是引用的一个外部项目:proxy-wasm/proxy-wasm-cpp-host/blob/master/src/wasm.cc

是真正的创建沙箱的逻辑。

std::shared_ptr<WasmHandleBase> createWasm(const std::string &vm_key, const std::string &code,
                                            const std::shared_ptr<PluginBase> &plugin,
                                            const WasmHandleFactory &factory,
                                            const WasmHandleCloneFactory &clone_factory,
                                            bool allow_precompiled) {
   std::shared_ptr<WasmHandleBase> wasm_handle;
   {
     std::lock_guard<std::mutex> guard(base_wasms_mutex);
     if (base_wasms == nullptr) {
       base_wasms = new std::remove_reference<decltype(*base_wasms)>::type;
     }
     auto it = base_wasms->find(vm_key);
     if (it != base_wasms->end()) {
       wasm_handle = it->second.lock();
       if (!wasm_handle) {
         base_wasms->erase(it);
       }
     }
     if (!wasm_handle) {
       // If no cached base_wasm, creates a new base_wasm, loads the code and initializes it.
       wasm_handle = factory(vm_key);
       if (!wasm_handle) {
         return nullptr;
       }
       if (!wasm_handle->wasm()->load(code, allow_precompiled)) {
         wasm_handle->wasm()->fail(FailState::UnableToInitializeCode, "Failed to load Wasm code");
         return nullptr;
       }
       if (!wasm_handle->wasm()->initialize()) {
         wasm_handle->wasm()->fail(FailState::UnableToInitializeCode,
                                   "Failed to initialize Wasm code");
         return nullptr;
       }
       (*base_wasms)[vm_key] = wasm_handle;
     }
   }
 
   // Either creating new one or reusing the existing one, apply canary for each plugin.
   if (!wasm_handle->canary(plugin, clone_factory)) {
     return nullptr;
   }
   return wasm_handle;
 };
 

里面先是本地查找有没有对应的沙箱,没有的话调用factory新建一个,然后依次调用load,initialize等。

initialize会初始化wasm_vm_(依赖配置中的runtime,貌似现在用的是v8),然后注册一些方法,这些都是具体虚拟机映射api的动作了,暂时略过。

2.  回调cb函数

对应的就是上一段调用端的getOrCreateThreadLocalPlugin,这个后续再展开。

这里只关心callback调用时机。是在入口主线程,这个base_wasm会在callback的时候给每个子线程创建, 代码如下:

  /**
    * Set thread local data on all threads previously registered via registerThread().
    * @param initializeCb supplies the functor that will be called *on each thread*. The functor
    *                     returns the thread local object which is then stored. The storage is via
    *                     a shared_ptr. Thus, this is a flexible mechanism that can be used to share
    *                     the same data across all threads or to share different data on each thread.
    *
    * NOTE: The initialize callback is not supposed to capture the Slot, or its owner, as the owner
    * may be destructed in main thread before the update_cb gets called in a worker thread.
    */
   using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>;
   virtual void set(InitializeCb cb) PURE;
 

实现函数为getOrCreateThreadLocalPlugin:

PluginHandleSharedPtr
 getOrCreateThreadLocalPlugin(const WasmHandleSharedPtr& base_wasm, const PluginSharedPtr& plugin,
                              Event::Dispatcher& dispatcher,
                              CreateContextFn create_root_context_for_testing) {
   if (!base_wasm) {
     if (!plugin->fail_open_) {
       ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), critical,
                           "Plugin configured to fail closed failed to load");
     }
     // To handle the case when failed to create VMs and fail-open/close properly,
     // we still create PluginHandle with null WasmBase.
     return std::make_shared<PluginHandle>(nullptr, plugin);
   }
   return std::static_pointer_cast<PluginHandle>(proxy_wasm::getOrCreateThreadLocalPlugin(
       std::static_pointer_cast<WasmHandle>(base_wasm), plugin,
       getWasmHandleCloneFactory(dispatcher, create_root_context_for_testing),
       getPluginHandleFactory()));
 }
 

这部分代码又依赖到了proxy_wasm项目

关键代码如下:

std::shared_ptr<PluginHandleBase> getOrCreateThreadLocalPlugin(
     const std::shared_ptr<WasmHandleBase> &base_handle, const std::shared_ptr<PluginBase> &plugin,
     const WasmHandleCloneFactory &clone_factory, const PluginHandleFactory &plugin_factory) {
   std::string key(std::string(base_handle->wasm()->vm_key()) + "||" + plugin->key());
   // Get existing thread-local Plugin handle.
   auto it = local_plugins.find(key);
   if (it != local_plugins.end()) {
     auto plugin_handle = it->second.lock();
     if (plugin_handle) {
       return plugin_handle;
     }
     local_plugins.erase(it);
   }
   removeStaleLocalCacheEntries(local_plugins, local_plugins_keys);
   // Get thread-local WasmVM.
   auto wasm_handle = getOrCreateThreadLocalWasm(base_handle, clone_factory);
   if (!wasm_handle) {
     return nullptr;
   }
   // Create and initialize new thread-local Plugin.
   auto *plugin_context = wasm_handle->wasm()->start(plugin);
   if (plugin_context == nullptr) {
     base_handle->wasm()->fail(FailState::StartFailed, "Failed to start thread-local Wasm");
     return nullptr;
   }
   if (!wasm_handle->wasm()->configure(plugin_context, plugin)) {
     base_handle->wasm()->fail(FailState::ConfigureFailed,
                               "Failed to configure thread-local Wasm plugin");
     return nullptr;
   }
   auto plugin_handle = plugin_factory(wasm_handle, plugin);
   cacheLocalPlugin(key, plugin_handle);
   wasm_handle->wasm()->wasm_vm()->addFailCallback([key](proxy_wasm::FailState fail_state) {
     if (fail_state == proxy_wasm::FailState::RuntimeError) {
       // If VM failed, erase the entry so that:
       // 1) we can recreate the new thread local plugin from the same base_wasm.
       // 2) we wouldn't reuse the failed VM for new plugin configs accidentally.
       local_plugins.erase(key);
     };
   });
   return plugin_handle;
 }
 

这段代码做了几件事情:

  1. 按照vm_key查找本地有没有对应的plugin对象,有的话直接用。
  2. 没有的话调用factory函数,创建一个。

至此,基本可以搞清楚整个vm的创建过程了,主线程先创建沙箱,然后通过TLS机制给到各个worker线程再建本地对象。而这些过程中,vm_key会作为一个缓存key,多次使用同样的代码和vm_id是可以复用线程内已有的沙箱的。

小结

沙箱机制大致示意图如下

特殊地,在主线程中也有沙箱,主线程的沙箱是一个wasmservice类,用户也能编写插件,如果指定singleton为true则复用该句柄。

插入worker插件可以任意指定filter,如HTTP_FILTER。插入主线程则指定为BOOTSTRAP。

共享数据

关键结构体:Context, SharedData

1.  Context

顾名思义就是上下文。

有以下几种:

ContextBase(WasmBase *wasm);                                            // Vm Context.
ContextBase(WasmBase *wasm, const std::shared_ptr<PluginBase> &plugin); // Root Context.
ContextBase(WasmBase *wasm, uint32_t parent_context_id,
               const std::shared_ptr<PluginHandleBase> &plugin_handle); // Stream context.
 

一个VMContext对应创建多个PluginContext,PluginContext再分别给每个请求创建context。Context里面可以进行一些变量的传递。但是需要注意的是,每种context都有自己的作用范围。

官方的描述大概如下:

 

wasm本身还会由envoy放入一些属性,可以给我们上下文使用的。

Wasm attributes

In addition to all above, the following extra attributes are available to Wasm extensions:

Attribute

Type

Description

plugin_name

string

Plugin name

plugin_root_id

string

Plugin root ID

plugin_vm_id

string

Plugin VM ID

node

Node

Local node description

cluster_name

string

Upstream cluster name

cluster_metadata

Metadata

Upstream cluster metadata

listener_direction

int

Enumeration value of the listener traffic direction

listener_metadata

Metadata

Listener metadata

route_name

string

Route name

route_metadata

Metadata

Route metadata

upstream_host_metadata

Metadata

Upstream host metadata

listener_diretion需要使用int64来解析,参考代码如下:

property, err := proxywasm.GetProperty([]string{"listener_direction"})
if err != nil {
proxywasm.LogErrorf("GetProperty listener_direction fail err %v try xds.listener_direction", err)
property, err = proxywasm.GetProperty([]string{"xds", "listener_direction"})
if err != nil {
proxywasm.LogErrorf("GetProperty xds.listener_direction failed %v", err)
return
}
}
d := binary.LittleEndian.Uint64(property)
//UNSPECIFIED
//(DEFAULT) Default option is unspecified.
//
//INBOUND
//The transport is used for incoming traffic.
//
//OUTBOUND
//The transport is used for outgoing traffic.
if d == 2 {
ctx.Outbound = true
}

2.  SharedData

由于context的内存隔离,在不同线程里面同步数据是需要其他特殊机制的,SharedData是目前可用的一种方式。

定义

// Shared Data
 WasmResult ContextBase::getSharedData(std::string_view key,
                                       std::pair<std::string, uint32_t> *data) {
   return getGlobalSharedData().get(wasm_->vm_id(), key, data);
 }
 
 WasmResult ContextBase::setSharedData(std::string_view key, std::string_view value, uint32_t cas) {
   return getGlobalSharedData().set(wasm_->vm_id(), key, value, cas);
 }
 
 SharedData &getGlobalSharedData() {
   static auto *ptr = new SharedData;
   return *ptr;
 };
 

可以看到每个上下文,都可以使用sharedData,getGlobalSharedData内部是一个static指针,指向一个块SharedData,表示这个是一个全局静态变量。而且没有析构函数,可以近似认为这个是单例的一个内存空间。

注:虽然没有析构函数,但是每个vm创建后还是会注册一个函数给SharedData,传入vmid,在vm析构的过程中,会先调用SharedData的deleteByVmId把内存降下来。

关键函数是registerVmIdHandleCallback和~VmIdHandle()。VmIdHandle为WasmBase里面的一个shared_ptr智能指针,引用数为0后会析构。这部分代码不展开讲了,有兴趣的同学自己上github看。

SharedData分析

SharedData按vmid分map。每个map里面value为另一个unordered_map。这个unordered_map才是真正的共享数据,value为data+数据版本号(cas)。

  // TODO: use std::shared_mutex in C++17.
   std::mutex mutex_;
   uint32_t cas_ = 1;
   std::map<std::string, std::unordered_map<std::string, std::pair<std::string, uint32_t>>> data_;
 

读写时的cas

  1. 目前还是用的mutex,这个是个很重的锁,但是有意改为使用shared_mutex。
  2. 当读写锁分开后,写操作需要利用数据版本号控制。

读接口把数据和版本号cas一起返回,调用端如果需要把数据写回的话,会判断cas是否为最新,如果不是最新,则表示数据被更新过,这样需要调用端自行重试。

读操作就不看了,简单看一些写操作:

WasmResult SharedData::set(std::string_view vm_id, std::string_view key, std::string_view value,
                            uint32_t cas) {
   std::lock_guard<std::mutex> lock(mutex_);
   std::unordered_map<std::string, std::pair<std::string, uint32_t>> *map;
   auto map_it = data_.find(std::string(vm_id));
   if (map_it == data_.end()) {
     map = &data_[std::string(vm_id)];
   } else {
     map = &map_it->second;
   }
   auto it = map->find(std::string(key));
   if (it != map->end()) {
     if (cas != 0U && cas != it->second.second) {
       return WasmResult::CasMismatch;
     }
     it->second = std::make_pair(std::string(value), nextCas());
   } else {
     map->emplace(key, std::make_pair(std::string(value), nextCas()));
   }
   return WasmResult::Ok;
 }
 

nextCas还是一个全局的计数器,是一个软CAS,甚至没有vmid隔离,感觉还有优化空间。锁粒度未全局级,也有优化空间。

在Context中使用

WasmResult ContextBase::setSharedData(std::string_view key, std::string_view value, uint32_t cas) {
   return getGlobalSharedData().set(wasm_->vm_id(), key, value, cas);
 }
 

使用比较简单,直接get/set就行了,这里屏蔽了vm_id,就是说SharedData在插件编写者的角度来看,只能读写自己沙箱的数据,但是可以多线程共享。

如何使用沙箱

c++为例

c++sdk给出两个Context的工厂函数,我们可以自行创建子类来实现,里面有很多的虚函数,我们按需来继承就好。

他们分别是RootContext和Context。对应的就是Plugin级别的Context和Stream级别Context的。

sdk中给出的ContextBase虽然跟vm级别的Context同名,但是这仅仅是sdk给RootContext的普通Context的一个公共父类,重名只是巧合。我们是没有办法操作vm级别的Context的。

从前文的代码分析也可以看出来,Plugin是每个线程通过tls机制初始化的,他们和主线程之间内存隔离,线程之间也相互隔离。

沙箱的函数隔离

wasm的沙箱在启动的时候会注册一个函数列表,后续这个函数列表跟沙箱外的函数列表一一映射,在调用时会找到相应的函数运行。

envoy的函数注册

void Wasm::registerCallbacks() {
   WasmBase::registerCallbacks();
 #define _REGISTER(_fn)                                                                             \
   wasm_vm_->registerCallback(                                                                      \
       "env", "envoy_" #_fn, &_fn,                                                                  \
       &proxy_wasm::ConvertFunctionWordToUint32<decltype(_fn), _fn>::convertFunctionWordToUint32)
   _REGISTER(resolve_dns);
 #undef _REGISTER
 }
 

proxy_wasm的函数注册

#define _REGISTER_PROXY(_fn) _REGISTER("env", "proxy_", , _fn)
   FOR_ALL_HOST_FUNCTIONS(_REGISTER_PROXY);
 
   if (abiVersion() == AbiVersion::ProxyWasm_0_1_0) {
     _REGISTER_PROXY(get_configuration);
     _REGISTER_PROXY(continue_request);
     _REGISTER_PROXY(continue_response);
     _REGISTER_PROXY(clear_route_cache);
   } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_0) {
     _REGISTER_PROXY(continue_stream);
     _REGISTER_PROXY(close_stream);
   } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_1) {
     _REGISTER_PROXY(continue_stream);
     _REGISTER_PROXY(close_stream);
     _REGISTER_PROXY(get_log_level);
   }
 #undef _REGISTER_PROXY
 
 //其中FOR_ALL_HOST_FUNCTIONS包括以下函数
 
 #define FOR_ALL_HOST_FUNCTIONS(_f)                                                                 \
   _f(log) _f(get_status) _f(set_property) _f(get_property) _f(send_local_response)                 \
       _f(get_shared_data) _f(set_shared_data) _f(register_shared_queue) _f(resolve_shared_queue)   \
           _f(dequeue_shared_queue) _f(enqueue_shared_queue) _f(get_header_map_value)               \
               _f(add_header_map_value) _f(replace_header_map_value) _f(remove_header_map_value)    \
                   _f(get_header_map_pairs) _f(set_header_map_pairs) _f(get_header_map_size)        \
                       _f(get_buffer_status) _f(get_buffer_bytes) _f(set_buffer_bytes)              \
                           _f(http_call) _f(grpc_call) _f(grpc_stream) _f(grpc_close)               \
                               _f(grpc_cancel) _f(grpc_send) _f(set_tick_period_milliseconds)       \
                                   _f(get_current_time_nanoseconds) _f(define_metric)               \
                                       _f(increment_metric) _f(record_metric) _f(get_metric)        \
                                           _f(set_effective_context) _f(done)                       \
                                               _f(call_foreign_function)

 

文章来自个人专栏
envoy源码分析:线程模型与沙箱机制
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
3
1