0%

dapr中使用分布式配置中心

本文记录gozero微服务在k8s中使用dapr的配置组件,做为分布式配置中心完整方案;因为agile使用的pgsql的数据库,这里首先在agile数据库中新建pgsql表,建立触发器通知dapr,agile微服务中改进为发布时往keyvalue表中插入配置以触发通知dapr,dapr再通知各个关注的微服务。
gozero微服务中通过dapr sdk,解析获得的配置数据。

1.建立dapr使用的pgsql配置表

  • 1.1 建立表(agc_config_published_ex)

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE IF NOT EXISTS agc_config_published_ex (
    KEY VARCHAR NOT NULL,
    VALUE VARCHAR NOT NULL,
    VERSION VARCHAR NOT NULL,
    METADATA JSON
    );

  • 1.2 建立触发器函数(notify_event)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
    DECLARE
    payload json;
    notification json;

    BEGIN
    payload = row_to_json(NEW);
    notification = json_build_object(
    'table',TG_TABLE_NAME,
    'action', TG_OP,
    'data', payload);
    PERFORM pg_notify('agile_config_trigger',notification::text);
    RETURN NULL;
    END;
    $$ LANGUAGE plpgsql;

  • 1.3 关联表和触发器(agc_config_published_trigger)

    1
    2
    3
    4
    CREATE TRIGGER notify_event_trigger
    AFTER INSERT ON agc_config_published_ex
    FOR EACH ROW EXECUTE PROCEDURE notify_event();

  • 1.4 当在agile上发布配置信息时更新表(agc_config_published_ex)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    CREATE OR REPLACE FUNCTION "public"."update_publish_event"()
    RETURNS "pg_catalog"."trigger" AS $$
    DECLARE
    new_app_id varchar;
    new_key varchar;
    num int;
    new_json json;

    BEGIN
    new_app_id := replace(lower(NEW.app_id), '-', '_');
    new_key := new_app_id || '.' || lower(NEW.env) || '.' || NEW."g" || '.' || NEW."k";
    new_json := json_build_object('publish_timeline_id',NEW."publish_timeline_id");

    SELECT count(*) INTO num FROM agc_config_published_ex where "key" = new_key;
    if num = 0 THEN
    INSERT INTO agc_config_published_ex ("key","value","version","metadata") VALUES (new_key,NEW.v,NEW.version,new_json);
    ELSE
    UPDATE agc_config_published_ex set "value"=NEW.v,"version"=NEW."version",metadata=new_json WHERE "key"=new_key;
    END IF;
    RETURN NULL;
    END;
    $$
    LANGUAGE plpgsql

  • 1.5 关联触发器安装到表(agc_config_published)上

    1
    2
    3
    CREATE TRIGGER published_trigger
    AFTER INSERT ON agc_config_published
    FOR EACH ROW EXECUTE PROCEDURE update_publish_event();

2.dapr在configure组件在k8s中的部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: configstore
namespace: baas
spec:
type: configuration.postgresql
version: v1
metadata:
- name: table
value: "agc_config_published_ex"
- name: connectionString
value: "host=$(HOST) user=$(USERNAME) password=$(PWD) port=5678 connect_timeout=10 database=zhyypt-agile-config sslmode=disable"
scopes:
- test-secrets
# scopes:
# - service-api #可以限定某些应用

3.gozero中的代码

  • 3.1 常量定义

    1
    2
    3
    4
    5
    var (
    DAPR_CONFIGURATION_STORE = "configstore" // dapr配置名
    PGNOTIFY_CHANNEL = "agile_config_trigger" //notify_event函数中的pg_notify的频道名字
    syncOnce sync.Once
    )
  • 3.2 从pgsql中订阅agile的数据更新

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (c *Config) subcribeFromAgile(keys map[string]string) error {
    ctx := context.Background()
    client, err := dapr.NewClient()
    if err != nil {
    return err
    }

    err = client.Wait(ctx, 10*time.Second)
    if err != nil {
    return err
    }

    real_keys := make([]string, 0)
    for _, v := range keys {
    real_keys = append(real_keys, v)
    }

    items, err := client.GetConfigurationItems(ctx, DAPR_CONFIGURATION_STORE, real_keys)
    if err != nil {
    return err
    }
    c.setValues(keys, items)

    go doSubcribe(ctx, client, keys, c)
    return nil
    }
点击下载完整代码
  • 3.3 使用方法
    1
    2
    3
    var c config.Config
    conf.MustLoad(*configFile, &c)
    c.Reload("test-secrets", "dev")

总结
注意事项:

  • agc_config_published_ex表中version字段必须是字符串类型
  • key的名字规范是不能有’-‘以及其他特殊字符。
  • 当多个服务通用时,需要什么配置就拿取什么key的配置,当前是使用service_id.env.group.key的形式作为唯一key来适配agile;
  • 在dapr源码中字段是采用select * from这种类型,表结构中只能包含四个字段。