和 Dubbo
这样的 RPC 协议(语义和 HTTP 类似),Aeraki
沿用其原生的CRD : VirtualService 和 DestinationRule,并根据CRD定义的路由规则和流量规则去生成对应的EnvoyFilter;对于非 RPC 协议,Aeraki
则定义了一些新的 CRD 来进行管理,例如针对redis服务定义了 RedisService 和 RedisDestination并根据里面的定义去生成对应的EnvoyFilter。
├── # README文档
├── cmd
│ └── aeraki # main函数入口
├── common-protos # 一些pb相关的
├── demo # aeraki提供的服务样例yaml文件
│ ├── aeraki-demo.json # grafana导出的配置
│ ├── dubbo # dubbo服务的yaml,包括dubbo的deployment,destinationRule,virtualService,serviceEntry
│ ├── gateway # kiali,prometheus,grafana的一些gateway,service
│ ├── # 安装aeraki脚本(包括istio、kiali、prometheus、grafana、dubbo、thrift、kafka等)
│ ├── kafka # kafka相关脚本
│ ├── thrift # thrift相关yaml
│ └── # 卸载脚本
├── docker
│ └── Dockerfile # aeraki的dockerfile
├── docs # 文档
├── go.mod
├── go.sum
├── k8s
│ └── aeraki.yaml # aeraki的yaml文件
├── pkg # aeraki的核心代码
│ ├── bootstrap # aeraki的server代码
│ ├── config # configController 监听Istio config xDS server的配置变更
│ ├── envoyfilter # envoyFilterController 生成对应的envoyFilter
│ ├── kube # 与k8s的apiserve的交互
│ └── model # 一些定义还有协议的识别(根据PortName)
├── plugin # 各个协议插件对应的Generator实例实现
│ ├── dubbo
│ ├── kafka
│ ├── redis
│ ├── thrift
│ └── zookeeper
├── test # 一些yaml文件与脚本
└── vendor # vendor
//------------------source: aeraki/cmd/aeraki/main.go-------------------------//
func main() {
args := bootstrap.NewAerakiArgs()
args.IstiodAddr = *flag.String("istiodaddr", defaultIstiodAddr, "Istiod xds server address")
args.Namespace = *flag.String("namespace", defaultNamespace, "Current namespace")
args.ElectionID = *flag.String("electionID", defaultElectionID, "ElectionID to elect master controller")
args.LogLevel = *flag.String("logLevel", defaultLogLevel, "Component log level")
// Create the stop channel for all of the servers.
stopChan := make(chan struct{}, 1)
args.Protocols = initGenerators()
server := bootstrap.NewServer(args)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
stopChan <- struct{}{}
func initGenerators() map[protocol.Instance]envoyfilter.Generator {
return map[protocol.Instance]envoyfilter.Generator{
protocol.Dubbo: dubbo.NewGenerator(),
protocol.Thrift: thrift.NewGenerator(),
protocol.Kafka: kafka.NewGenerator(),
protocol.Zookeeper: zookeeper.NewGenerator(),
//------------------source: aeraki/pkg/bootstrap/server.go-----------------------//
type Server struct {
args *AerakiArgs
configController *config.Controller // configController监听istio配置变更
envoyFilterController *envoyfilter.Controller // 生成envoyfilter的controller
crdController manager.Manager // 管理自定义crd的controller
stopCRDController func()
// Aeraki运行需要的参数
type AerakiArgs struct {
IstiodAddr string
ListenAddr string
Namespace string
ElectionID string
LogLevel string
Protocols map[protocol.Instance]envoyfilter.Generator
// 新建server实例,初始化各个controller
func NewServer(args *AerakiArgs) *Server {
// configController实例化
configController := config.NewController(args.IstiodAddr)
// envoyFilterController实例化
envoyFilterController := envoyfilter.NewController(configController.Store, args.Protocols)
// crdController实例化
crdController := controller.NewManager(args.Namespace, args.ElectionID, func() error {
// 自定义的CRD资源由更新也会交给envoyFilterController去对应处理
return nil
cfg := crdController.GetConfig()
args.Protocols[protocol.Redis] = redis.New(cfg, configController.Store)
// configController事件处理handler,如果有配置添加/更新/删除则交给envoyFilterController去对应处理
configController.RegisterEventHandler(args.Protocols, func(_, curr istioconfig.Config, event model.Event) {
// 往envoyFilterController的pushChannel写入event
return &Server{
args: args,
configController: configController,
envoyFilterController: envoyFilterController,
crdController: crdController,
// Start starts all components of the Aeraki service. Serving can be canceled at any time by closing the provided stop channel.
// This method won't block
func (s *Server) Start(stop <-chan struct{}) {
aerakiLog.Info("Staring Aeraki Server")
go func() {
aerakiLog.Infof("Starting Envoy Filter Controller")
go func() {
aerakiLog.Infof("Watching xDS resource changes at %s", s.args.IstiodAddr)
ctx, cancel := context.WithCancel(context.Background())
s.stopCRDController = cancel
go func() {
_ = s.crdController.Start(ctx)
//------------------source: aeraki/pkg/config/configcontroller.go-----------------------//
// Controller watches Istio config xDS server and notifies the listeners when config changes.
type Controller struct {
configServerAddr string
Store istiomodel.ConfigStore // istiomodel =>
controller istiomodel.ConfigStoreCache // 监控ConfigStore
func (c *Controller) Run(stop <-chan struct{}) {
go func() {
for {
// 拿到istio的xdsMCP
xdsMCP, err := adsc.New(c.configServerAddr, &adsc.Config{
Meta: istiomodel.NodeMetadata{
Generator: "api",
InitialDiscoveryRequests: c.configInitialRequests(),
BackoffPolicy: backoff.NewConstantBackOff(time.Second),
if err != nil {
controllerLog.Errorf("failed to dial XDS %s %v", c.configServerAddr, err)
time.Sleep(5 * time.Second)
xdsMCP.Store = istiomodel.MakeIstioStore(c.controller)
if err = xdsMCP.Run(); err != nil {
controllerLog.Errorf("adsc: failed running %v", err)
time.Sleep(5 * time.Second)
type controller struct {
monitor Monitor
configStore model.ConfigStore
// NewController return an implementation of model.ConfigStoreCache
// This is a client-side monitor that dispatches events as the changes are being
// made on the client.
func NewController(cs model.ConfigStore) model.ConfigStoreCache {
out := &controller{
configStore: cs,
monitor: NewMonitor(cs),
return out
func (c *controller) Run(stop <-chan struct{}) {
// Monitor provides methods of manipulating changes in the config store
type Monitor interface {
Run(<-chan struct{})
AppendEventHandler(config2.GroupVersionKind, Handler)
func NewMonitor(store model.ConfigStore) Monitor {
return newBufferedMonitor(store, BufferSize, false)
func (m *configstoreMonitor) Run(stop <-chan struct{}) {
if m.sync {
for {
select {
case <-stop:
case ce, ok := <-m.eventCh:
if ok {
// eventChannel的event处理
func (m *configstoreMonitor) processConfigEvent(ce ConfigEvent) {
if _, exists := m.handlers[ce.config.GroupVersionKind]; !exists {
log.Warnf("Config GroupVersionKind %s does not exist in config store", ce.config.GroupVersionKind)
m.applyHandlers(ce.old, ce.config, ce.event)
//------------------source: aeraki/pkg/config/configcontroller.go-----------------------//
// RegisterEventHandler adds a handler to receive config update events for a configuration type
func (c *Controller) RegisterEventHandler(protocols map[protocol.Instance]envoyfilter.Generator,
handler func(istioconfig.Config, istioconfig.Config, istiomodel.Event)) {
handlerWrapper := func(prev istioconfig.Config, curr istioconfig.Config, event istiomodel.Event) {
if event == istiomodel.EventUpdate && reflect.DeepEqual(prev.Spec, curr.Spec) {
// ServiceEntry有变动
if curr.GroupVersionKind == collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind() {
service, ok := curr.Spec.(*networking.ServiceEntry)
if !ok {
// This should never happen,断言为ServiceEntry失败,不可能发生
controllerLog.Errorf("Failed in getting a virtual service: %v", curr.Name)
// 通过portName来识别对应的协议,必须以tcp-protocol-serviceXXX命名
for _, port := range service.Ports {
if !strings.HasPrefix(port.Name, "tcp") {
if _, ok := protocols[protocol.GetLayer7ProtocolFromPortName(port.Name)]; ok {
controllerLog.Infof("Matched protocol :%s %s %s", protocol.GetLayer7ProtocolFromPortName(port.Name), event.String(), curr.Name)
// 执行对应的handler
handler(prev, curr, event)
} else if curr.GroupVersionKind == collections.IstioNetworkingV1Alpha3Virtualservices.Resource().GroupVersionKind() {
// VirtualService有变动
controllerLog.Infof("Virtual Service changed: %s %s", event.String(), curr.Name)
vs, ok := curr.Spec.(*networking.VirtualService)
if !ok {
// This should never happen
controllerLog.Errorf("Failed in getting a virtual service: %v", event.String(), curr.Name)
// 获取所有的serviceEntries
serviceEntries, err := c.Store.List(collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(), "")
if err != nil {
controllerLog.Errorf("Failed to list configs: %v", err)
for _, config := range serviceEntries {
service, ok := config.Spec.(*networking.ServiceEntry)
if !ok { // should never happen
controllerLog.Errorf("failed in getting a service entry: %s: %v", config.Labels, err)
if len(service.Hosts) > 0 {
for _, host := range service.Hosts {
// 与virtualService中的host匹配上了,执行对应的handler
if host == vs.Hosts[0] {
for _, port := range service.Ports {
if _, ok := protocols[protocol.GetLayer7ProtocolFromPortName(port.Name)]; ok {
handler(prev, curr, event)
schemas := configCollection.All()
for _, schema := range schemas {
// 在event处理过程中注入handlerWrapper
c.controller.RegisterEventHandler(schema.Resource().GroupVersionKind(), handlerWrapper)
//------------------source: aeraki/pkg/envoyfilter/controller.go-----------------------//
// Controller contains the runtime configuration for the envoyFilter controller.
type Controller struct {
configStore istiomodel.ConfigStore
generators map[protocol.Instance]Generator
// server的RegisterEventHandler会往channel中写入event
pushChannel chan istiomodel.Event
func (s *Controller) Run(stop <-chan struct{}) {
go func() {
const (
// debounceAfter is the delay added to events to wait after a registry event for debouncing.
// This will delay the push by at least this interval, plus the time getting subsequent events.
// If no change is detected the push will happen, otherwise we'll keep delaying until things settle.
debounceAfter = 500 * time.Millisecond
// debounceMax is the maximum time to wait for events while debouncing.
// Defaults to 10 seconds. If events keep showing up with no break for this time, we'll trigger a push.
debounceMax = 10 * time.Second
func (s *Controller) mainLoop(stop <-chan struct{}) {
var timeChan <-chan time.Time
var startDebounce time.Time
var lastResourceUpdateTime time.Time
pushCounter := 0
debouncedEvents := 0
for {
select {
case <-stop:
case e := <-s.pushChannel:
controllerLog.Debugf("Receive event from push chanel : %v", e)
lastResourceUpdateTime = time.Now()
if debouncedEvents == 0 {
controllerLog.Debugf("This is the first debounced event")
startDebounce = lastResourceUpdateTime
timeChan = time.After(debounceAfter)
case <-timeChan:
controllerLog.Debugf("Receive event from time chanel")
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastResourceUpdateTime)
// it has been too long since the first debounced event or quiet enough since the last debounced event
if eventDelay >= debounceMax || quietTime >= debounceAfter {
if debouncedEvents > 0 {
controllerLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push",
pushCounter, debouncedEvents, quietTime, eventDelay)
// 推送envoyFilter到k8s的apiserver
err := s.pushEnvoyFilters2APIServer()
if err != nil {
// 有错误重新加到push-Channel重试
controllerLog.Errorf("%v", err)
// Retry if failed to push envoyFilters to APIServer
debouncedEvents = 0
} else {
timeChan = time.After(debounceAfter - quietTime)
//------------------source: aeraki/pkg/envoyfilter/controller.go-----------------------//
// 生成envoyFilter
func (s *Controller) generateEnvoyFilters() (map[string]*model.EnvoyFilterWrapper, error) {
envoyFilters := make(map[string]*model.EnvoyFilterWrapper)
// 获取serviceEntries
serviceEntries, err := s.configStore.List(collections.IstioNetworkingV1Alpha3Serviceentries.Resource().GroupVersionKind(), "")
if err != nil {
return envoyFilters, fmt.Errorf("failed to list configs: %v", err)
for _, config := range serviceEntries {
service, ok := config.Spec.(*networking.ServiceEntry)
if !ok { // should never happen
return envoyFilters, fmt.Errorf("failed in getting a service entry: %s: %v", config.Labels, err)
if len(service.Hosts) == 0 {
controllerLog.Errorf("host should not be empty: %s", config.Name)
// We can't retry in this scenario
return envoyFilters, nil
if len(service.Hosts) > 1 {
controllerLog.Warnf("multiple hosts found for service: %s, only the first one will be processed", config.Name)
// 也是根据host去找到相关的VirtualService
relatedVs, err := s.findRelatedVirtualService(service)
if err != nil {
return envoyFilters, fmt.Errorf("failed in finding the related virtual service : %s: %v", config.Name, err)
// 开始生成EnvoyFilter
context := &model.EnvoyFilterContext{
ServiceEntry: &model.ServiceEntryWrapper{
Meta: config.Meta,
Spec: service,
VirtualService: relatedVs,
for _, port := range service.Ports {
instance := protocol.GetLayer7ProtocolFromPortName(port.Name)
if generator, ok := s.generators[instance]; ok {
// 由各个协议的generator去处理
envoyFilterWrappers := generator.Generate(context)
for _, wrapper := range envoyFilterWrappers {
envoyFilters[wrapper.Name] = wrapper
return envoyFilters, nil
// 推送到APIserver
func (s *Controller) pushEnvoyFilters2APIServer() error {
generatedEnvoyFilters, err := s.generateEnvoyFilters()
if err != nil {
return fmt.Errorf("failed to generate EnvoyFilter: %v", err)
config, err := config.GetConfig()
if err != nil {
return fmt.Errorf("can not get kubernetes config: %v", err)
ic, err := versionedclient.NewForConfig(config)
if err != nil {
return fmt.Errorf("failed to create istio client: %v", err)
// 获取原先存在的envoyFilter
existingEnvoyFilters, _ := ic.NetworkingV1alpha3().EnvoyFilters(configRootNS).List(context.TODO(), v1.ListOptions{
LabelSelector: "manager=" + aerakiFieldManager,
for _, oldEnvoyFilter := range existingEnvoyFilters.Items {
// 删除
if newEnvoyFilter, ok := generatedEnvoyFilters[oldEnvoyFilter.Name]; !ok {
controllerLog.Infof("Deleting EnvoyFilter: %v", oldEnvoyFilter)
err = ic.NetworkingV1alpha3().EnvoyFilters(configRootNS).Delete(context.TODO(), oldEnvoyFilter.Name,
if err != nil {
err = fmt.Errorf("failed to create istio client: %v", err)
} else {
// 更新
if !proto.Equal(newEnvoyFilter.Envoyfilter, &oldEnvoyFilter.Spec) {
controllerLog.Infof("Updating EnvoyFilter: %v", *newEnvoyFilter.Envoyfilter)
_, err = ic.NetworkingV1alpha3().EnvoyFilters(configRootNS).Update(context.TODO(),
s.toEnvoyFilterCRD(newEnvoyFilter, &oldEnvoyFilter),
v1.UpdateOptions{FieldManager: aerakiFieldManager})
if err != nil {
err = fmt.Errorf("failed to update EnvoyFilter: %v", err)
} else {
// 无变更
controllerLog.Infof("EnvoyFilter: %s unchanged", oldEnvoyFilter.Name)
delete(generatedEnvoyFilters, oldEnvoyFilter.Name)
// 剩下的是之前不存在的则新增
for _, wrapper := range generatedEnvoyFilters {
_, err = ic.NetworkingV1alpha3().EnvoyFilters(configRootNS).Create(context.TODO(), s.toEnvoyFilterCRD(wrapper, nil),
v1.CreateOptions{FieldManager: aerakiFieldManager})
controllerLog.Infof("Creating EnvoyFilter: %v", *wrapper.Envoyfilter)
if err != nil {
err = fmt.Errorf("failed to create EnvoyFilter: %v", err)
return err
//------------------source: aeraki/pkg/envoyfilter/generator.go-----------------------//
// Generator generates protocol specified envoyfilters
type Generator interface {
Generate(context *model.EnvoyFilterContext) []*model.EnvoyFilterWrapper
//------------------source: aeraki/pkg/envoyfilter/network_filter.go-----------------------//
// network_filter 通用的生成envoyFilter方法
func generateNetworkFilter(service *networking.ServiceEntry, outboundProxy proto.Message,
inboundProxy proto.Message, filterName string, filterType string, operation networking.EnvoyFilter_Patch_Operation) []*model.EnvoyFilterWrapper {
var outboundProxyPatch, inboundProxyPatch *networking.EnvoyFilter_EnvoyConfigObjectPatch
if outboundProxy != nil {
outboundProxyStruct, err := generateValue(outboundProxy, filterName, filterType)
if err != nil {
//This should not happen
generatorLog.Errorf("Failed to generate outbound EnvoyFilter: %v", err)
return nil
outboundListenerName := service.GetAddresses()[0] + "_" + strconv.Itoa(int(service.Ports[0].Number))
outboundProxyPatch = &networking.EnvoyFilter_EnvoyConfigObjectPatch{
ApplyTo: networking.EnvoyFilter_NETWORK_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
Name: outboundListenerName,
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: wellknown.TCPProxy,
Patch: &networking.EnvoyFilter_Patch{
Operation: operation,
Value: outboundProxyStruct,
if inboundProxy != nil {
inboundProxyStruct, err := generateValue(inboundProxy, filterName, filterType)
if err != nil {
//This should not happen
generatorLog.Errorf("Failed to generate inbound EnvoyFilter: %v", err)
return nil
inboundProxyPatch = &networking.EnvoyFilter_EnvoyConfigObjectPatch{
ApplyTo: networking.EnvoyFilter_NETWORK_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
Name: "virtualInbound",
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
DestinationPort: service.Ports[0].Number,
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: wellknown.TCPProxy,
Patch: &networking.EnvoyFilter_Patch{
Operation: operation,
Value: inboundProxyStruct,
if outboundProxyPatch != nil && inboundProxyPatch != nil {
return []*model.EnvoyFilterWrapper{
Name: outboundEnvoyFilterName(service),
Envoyfilter: &networking.EnvoyFilter{
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{outboundProxyPatch},
Name: inboundEnvoyFilterName(service),
Envoyfilter: &networking.EnvoyFilter{
WorkloadSelector: service.WorkloadSelector,
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{inboundProxyPatch},
if outboundProxyPatch != nil {
return []*model.EnvoyFilterWrapper{
Name: outboundEnvoyFilterName(service),
Envoyfilter: &networking.EnvoyFilter{
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{outboundProxyPatch},
if inboundProxyPatch != nil {
return []*model.EnvoyFilterWrapper{
Name: inboundEnvoyFilterName(service),
Envoyfilter: &networking.EnvoyFilter{
WorkloadSelector: service.WorkloadSelector,
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{inboundProxyPatch},
return nil
// Generate create EnvoyFilters for Dubbo services
func (*Generator) Generate(context *model.EnvoyFilterContext) []*model.EnvoyFilterWrapper {
return envoyfilter.GenerateReplaceNetworkFilter(
// 以OutboundProxy为例,生成envoy需要的dubbo路由配置
func buildOutboundProxy(context *model.EnvoyFilterContext) *dubbo.DubboProxy {
route, err := buildOutboundRouteConfig(context)
if err != nil {
generatorLog.Errorf("Failed to generate Dubbo EnvoyFilter: %v, %v", context.ServiceEntry, err)
return nil
return &dubbo.DubboProxy{
StatPrefix: model.BuildClusterName(model.TrafficDirectionOutbound, "",
context.ServiceEntry.Spec.Hosts[0], int(context.ServiceEntry.Spec.Ports[0].Number)),
ProtocolType: dubbo.ProtocolType_Dubbo,
SerializationType: dubbo.SerializationType_Hessian2,
// we only support one to one mapping of interface and service. If there're multiple interfaces in one process,
// these interfaces can be defined in separated services, one service for one interface.
RouteConfig: []*dubbo.RouteConfiguration{
func buildOutboundRouteConfig(context *model.EnvoyFilterContext) (*dubbo.RouteConfiguration, error) {
// dubbo service interface should be passed in via serviceentry annotation
var serviceInterface string
var exist bool
if serviceInterface, exist = context.ServiceEntry.Annotations["interface"]; !exist {
err := fmt.Errorf("no interface annotation")
return nil, err
var route []*dubbo.Route
clusterName := model.BuildClusterName(model.TrafficDirectionOutbound, "", context.ServiceEntry.Spec.Hosts[0], int(context.ServiceEntry.Spec.Ports[0].Number))
if context.VirtualService == nil {
route = []*dubbo.Route{defaultRoute(clusterName)}
} else {
route = buildRoute(context)
return &dubbo.RouteConfiguration{
Name: clusterName,
Interface: serviceInterface, // To make this work, Dubbo Interface should have been registered to the Istio service registry as a service
Routes: route,
}, nil
//------------------source: aeraki/pkg/kube/controller/controller.go-----------------------//
func NewManager(namespace string, electionID string, triggerPush func() error) manager.Manager {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
controllerLog.Fatalf("Could not get apiserver config: %v\n", err)
return nil
mgrOpt := manager.Options{
MetricsBindAddress: "0",
LeaderElection: true,
LeaderElectionNamespace: namespace,
LeaderElectionID: electionID,
m, err := manager.New(cfg, mgrOpt)
if err != nil {
controllerLog.Fatalf("Could not create a controller manager: %v", err)
return nil
err = addRedisServiceController(m, triggerPush)
if err != nil {
controllerLog.Fatalf("Could not add RedisServiceController: %e", err)
return nil
err = addRedisDestinationController(m, triggerPush)
if err != nil {
controllerLog.Fatalf("Could not add RedisDestinationController: %e", err)
return nil
err = scheme.AddToScheme(m.GetScheme())
if err != nil {
controllerLog.Fatalf("Could not add schema: %e", err)
return m
//------------------source: aeraki/pkg/envoyfilter/redis.go-----------------------//
// 以redis的destination为例
func addRedisDestinationController(mgr manager.Manager, triggerPush func() error) error {
redisCtrl := &RedisController{triggerPush: triggerPush}
c, err := controller.New("aeraki-redis-destination-controller", mgr, controller.Options{Reconciler: redisCtrl})
if err != nil {
return err
// Watch for changes to primary resource IstioFilter
err = c.Watch(&source.Kind{Type: &v1alpha1.RedisDestination{}}, &handler.EnqueueRequestForObject{}, redisPredicates)
if err != nil {
return err
controllerLog.Infof("RedisDestinationController registered")
return nil