ubus实现进程间通信举例


请尊重原创版权,转载注明出处。

    上一篇文章 介绍了ubus的组件和实现原理,本文通过代码实例介绍使用ubus进行进程间通信的三种方式。

invoke的方式实现端对端通信

    最简单的情景就是一个提供服务的server端,一个请求服务的client端,client请求server的服务。

下面的例子中,server注册了一个名为“scan_prog”的对象,该对象中提供一个“scan”方法:

ubus_invoke.h:

#ifndef __UBUS_INVOKE_H__  
#define __UBUS_INVOKE_H__  
#include <json/json.h>  
#include <libubox/blobmsg_json.h>  
  
  
struct prog_attr {  
<span style="white-space:pre">  </span>char name[64];  
<span style="white-space:pre">  </span>int chn_id;  
};  
#define PROG_MAX<span style="white-space:pre">  </span>8  
  
  
#endif  /* __UBUS_INVOKE_H__ */  

invoke_server.c:

#include <libubox/uloop.h>  
#include <libubox/ustream.h>  
#include <libubox/utils.h>  
#include <libubus.h>  
#include <json/json.h>  
#include <libubox/blobmsg_json.h>  
#include "ubus_invoke.h"  
  
static struct ubus_context * ctx = NULL;  
static struct blob_buf b;  
static const char * sock_path;  
  
static struct prog_attr uri_list[PROG_MAX] =   
{  
    {"program_beijing", 1},  
    {"program_guangzhou", 2},  
    {"program_shanghai", 3},  
    {"", -1},  
};  
  
enum  
{  
    SCAN_CHNID,  
    SCAN_POLICY_MAX,  
};  
  
static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {  
    [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},  
};  
  
static int ubus_start_scan(struct ubus_context *ctx, struct ubus_object *obj,  
              struct ubus_request_data *req, const char *method,  
              struct blob_attr *msg)  
{  
    int ret = -1;  
    void * json_list = NULL;  
    void * json_uri = NULL;  
    int idx;  
  
    blob_buf_init(&b, 0);  
      
    struct blob_attr *tb[SCAN_POLICY_MAX];  
    blobmsg_parse(scan_policy, SCAN_POLICY_MAX, tb, blob_data(msg), blob_len(msg));  
  
    /* 
    本例子中,如果请求特定的节目号,返回节目名称。 
    如果请求节目号是0,则返回所有节目列表。 
    */  
    if (tb[SCAN_CHNID] != NULL)  
    {  
        int chnid = blobmsg_get_u32(tb[SCAN_CHNID]);  
  
        if (chnid == 0)  
        {  
            json_uri = blobmsg_open_array(&b, "prog_list");  
            for (idx = 0; idx < PROG_MAX; idx++)  
            {  
                if ('\0' != uri_list[idx].name[0])  
                {  
                    json_list = blobmsg_open_table(&b, NULL);  
                    blobmsg_add_string(&b, "name", uri_list[idx].name);  
                    blobmsg_add_u32(&b, "channel", uri_list[idx].chn_id);  
                    blobmsg_close_table(&b, json_list);  
                }  
            }  
            blobmsg_close_array(&b, json_uri);  
            ret = 0;  
        }  
        else  
        {  
            for (idx = 0; idx < PROG_MAX; idx++)  
            {  
                if ('\0' != uri_list[idx].name[0] && uri_list[idx].chn_id == chnid)  
                {  
                    blobmsg_add_string(&b, "name", uri_list[idx].name);  
                    ret = 0;  
                }  
            }  
        }  
    }  
      
    blobmsg_add_u32(&b, "result", ret);  
    ubus_send_reply(ctx, req, b.head);  
      
    return 0;  
}  
  
/* 方法列表 */  
static struct ubus_method scan_methods[] =   
{  
    UBUS_METHOD("scan", ubus_start_scan, scan_policy),  
};  
  
/* type目前没有实际用处 */  
static struct ubus_object_type scan_obj_type  
= UBUS_OBJECT_TYPE("scan_prog", scan_methods);  
  
static struct ubus_object scan_obj =   
{  
    .name = "scan_prog", /* 对象的名字 */  
    .type = &scan_obj_type,  
    .methods = scan_methods,  
    .n_methods = ARRAY_SIZE(scan_methods),  
};  
  
static void ubus_add_fd(void)  
{  
    ubus_add_uloop(ctx);  
  
#ifdef FD_CLOEXEC  
    fcntl(ctx->sock.fd, F_SETFD,  
        fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);  
#endif  
}  
  
static void ubus_reconn_timer(struct uloop_timeout *timeout)  
{  
    static struct uloop_timeout retry =  
    {  
        .cb = ubus_reconn_timer,  
    };  
    int t = 2;  
  
    if (ubus_reconnect(ctx, sock_path) != 0) {  
        uloop_timeout_set(&retry, t * 1000);  
        return;  
    }  
  
    ubus_add_fd();  
}  
  
static void ubus_connection_lost(struct ubus_context *ctx)  
{  
    ubus_reconn_timer(NULL);  
}  
  
static int display_ubus_init(const char *path)  
{  
    uloop_init();  
    sock_path = path;  
  
    ctx = ubus_connect(path);  
    if (!ctx)  
    {  
        printf("ubus connect failed\n");  
        return -1;  
    }  
      
    printf("connected as %08x\n", ctx->local_id);  
    ctx->connection_lost = ubus_connection_lost;  
  
    ubus_add_fd();  
  
    /* 向ubusd注册对象和方法,供其他ubus客户端调用。 */  
    if (ubus_add_object(ctx, &scan_obj) != 0)  
    {  
        printf("ubus add obj failed\n");  
        return -1;  
    }  
  
    return 0;  
}  
  
static void display_ubus_done(void)  
{  
    if (ctx)  
        ubus_free(ctx);  
}  
  
int main(int argc, char * argv[])  
{  
    char * path = NULL;  
      
    if (-1 == display_ubus_init(path))  
    {  
        printf("ubus connect failed!\n");  
        return -1;  
    }  
  
    uloop_run();  
  
    display_ubus_done();  
  
    return 0;  
}  

客户端代码invoke_client.c去调用server端的”scan_prog”对象的”scan”方法:

#include <libubox/uloop.h>  
#include <libubox/ustream.h>  
#include <libubox/utils.h>  
#include <libubus.h>  
#include <json/json.h>  
#include <libubox/blobmsg_json.h>  
#include "ubus_invoke.h"  
  
static struct ubus_context * ctx = NULL;  
static struct blob_buf b;  
static const char * cli_path;  
  
enum  
{  
    SCAN_CHNID,  
    SCAN_POLICY_MAX,  
};  
  
static const struct blobmsg_policy scan_policy[SCAN_POLICY_MAX] = {  
    [SCAN_CHNID] = {.name = "chnID", .type = BLOBMSG_TYPE_INT32},  
};  
  
static int timeout = 30;  
static bool simple_output = false;  
  
static void scanreq_prog_cb(struct ubus_request *req, int type, struct blob_attr *msg)  
{  
    char *str;  
    if (!msg)  
        return;  
  
    /*  
    在这里处理返回的消息。 
    本例子只是将返回的消息打印出来。 
    */  
    str = blobmsg_format_json_indent(msg, true, simple_output ? -1 : 0);  
    printf("%s\n", str);  
    free(str);  
}  
  
static int client_ubus_call()  
{  
    unsigned int id;  
    int ret;  
  
    blob_buf_init(&b, 0);  
  
    /* 需要传递的参数 */  
    blobmsg_add_u32(&b, scan_policy[SCAN_CHNID].name, 0);  
  
    /* 
    向ubusd查询是否存在"scan_prog"这个对象, 
    如果存在,返回其id 
    */  
    ret = ubus_lookup_id(ctx, "scan_prog", &id);  
    if (ret != UBUS_STATUS_OK) {  
        printf("lookup scan_prog failed\n");  
        return ret;  
    }  
    else {  
        printf("lookup scan_prog successs\n");  
    }  
      
    /* 调用"scan_prog"对象的"scan"方法 */  
    return ubus_invoke(ctx, id, "scan", b.head, scanreq_prog_cb, NULL, timeout * 1000);  
}  
  
/* 
ubus_invoke()的声明如下: 
int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method, 
                struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout); 
                 
其中cb参数是消息回调函数,其函数类型定义为: 
typedef void (*ubus_data_handler_t)(struct ubus_request *req, 
                    int type, struct blob_attr *msg); 
参数type是请求消息的类型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。 
 
参数msg存放从服务端得到的回复消息,struct blob_attr类型,同样用blobmsg_parse()来解析。 
 
参数req保存了请求方的消息属性,其中req->priv即ubus_invoke()中的priv参数, 
用这个参数可以零活的传递一些额外的数据。 
*/  
  
static int client_ubus_init(const char *path)  
{  
    uloop_init();  
    cli_path = path;  
  
    ctx = ubus_connect(path);  
    if (!ctx)  
    {  
        printf("ubus connect failed\n");  
        return -1;  
    }  
      
    printf("connected as %08x\n", ctx->local_id);  
  
    return 0;  
}  
  
static void client_ubus_done(void)  
{  
    if (ctx)  
        ubus_free(ctx);  
}  
  
int main(int argc, char * argv[])  
{  
    /* ubusd创建的unix socket,默认值为"/var/run/ubus.sock" */  
    char * path = NULL;  
  
    /* 连接ubusd */  
    if (UBUS_STATUS_OK != client_ubus_init(path))  
    {  
        printf("ubus connect failed!\n");  
        return -1;  
    }  
  
    /* 调用某个ubus方法并处理返回结果 */  
    client_ubus_call();  
  
    client_ubus_done();  
  
    return 0;  
}  

    先执行server程序,再执行client程序,可以看到client发出请求后, server返回了相应的节目号,在client打印出了接收到的消息。

也可以通过shell命令来请求server的服务,例如:

ubus call scan_prog scan '{"chnID": 0}'

这条命令和执行client程序的作用相同。

subscribe/notify的方式实现订阅

    ubus支持以订阅的方式进行进程间通信,通信方式如下图:

T1

    被订阅者(server)又称为notifier,订阅者(client)又称为subscriber。 这样一来,可以同时有多个client订阅server的某个服务,当server通过该服务广播消息时, 所有client都会被通知,并执行各自的处理。

主要过程为:

server进程:

  1. 连接ubusd。
  2. 注册一个object,用于client订阅。
  3. server可以向订阅者广播消息。
  4. 等待消息。

client进程:

  1. 连接ubusd。
  2. 向server订阅object,并定义收到server的消息时的处理函数。
  3. 等待消息。

    代码示例:下面这个例子中,server注册了一个名为“test”的对象,并定期广播消息。 而client去订阅这个对象,并对server发出的消息做处理。

notify_server.c:

#include <unistd.h>  
#include <libubox/blobmsg_json.h>  
#include <libubox/uloop.h>  
#include <libubus.h>  
  
static struct ubus_context *ctx;  
  
static void test_client_subscribe_cb(struct ubus_context *ctx, struct ubus_object *obj)  
{  
    fprintf(stderr, "Subscribers active: %d\n", obj->has_subscribers);  
}  
  
/* 这个空的method列表,只是为了让object有个名字,这样client可以通过object name来订阅。 */  
static struct ubus_method test_methods[] = {};  
  
static struct ubus_object_type test_obj_type =   
    UBUS_OBJECT_TYPE("test", test_methods);  
  
static struct ubus_object test_object = {  
    .name = "test", /* object的名字 */  
    .type = &test_obj_type,  
    .subscribe_cb = test_client_subscribe_cb,  
};  
  
static void notifier_main(void)  
{  
    int ret;  
  
    /* 注册一个object,client可以订阅这个object */  
    ret = ubus_add_object(ctx, &test_object);  
    if (ret) {  
        fprintf(stderr, "Failed to add object: %s\n", ubus_strerror(ret));  
        return;  
    }  
  
    /* 在需要的时候,向所有客户端发送notify消息 */  
      
    /* step1: 如果需要传递参数,则保存到struct blob_attr类型的结构体中。 */  
  
    /*  
    int ubus_notify(struct ubus_context *ctx, struct ubus_object *obj, const char *type, struct blob_attr *msg, int timeout); 
    type是一个字符串,自定义的。msg是需要携带的参数。如果需要等待回复,timeout需设置为>=0。 
    */  
    while (1) {  
        sleep(2);  
        /* step2: 广播notification消息。 */  
        ubus_notify(ctx,  &test_object, "say Hi!", NULL, -1);  
    }  
}  
  
int main(int argc, char **argv)  
{  
    const char *ubus_socket = NULL;  
  
    uloop_init();  
  
    ctx = ubus_connect(ubus_socket);  
    if (!ctx) {  
        fprintf(stderr, "Failed to connect to ubus\n");  
        return -1;  
    }  
  
    ubus_add_uloop(ctx);  
  
    notifier_main();  
      
    uloop_run();  
  
    ubus_free(ctx);  
    uloop_done();  
  
    return 0;  
}  

notify_client.c:客户端订阅“test”对象,在收到3次消息后,随即取消对“test”对象的订阅。

#include <unistd.h>  
#include <libubox/blobmsg_json.h>  
#include <libubox/uloop.h>  
#include <libubus.h>  
  
static struct ubus_context *ctx;  
  
static int counter = 0;  
static uint32_t obj_id;  
static struct ubus_subscriber test_event;  
  
static int test_notify(struct ubus_context *ctx, struct ubus_object *obj,  
                  struct ubus_request_data *req,  
                  const char *method, struct blob_attr *msg)  
{  
    printf("notify handler...\n");  
    counter++;  
    if (counter > 3)  
        ubus_unsubscribe(ctx, &test_event, obj_id); /* 取消订阅 */  
    return 0;  
}  
  
static void test_handle_remove(struct ubus_context *ctx,  
                      struct ubus_subscriber *obj, uint32_t id)  
{  
    printf("remove handler...\n");  
}  
  
static void subscriber_main(void)  
{  
    int ret;  
      
    /* 通知到来时的处理函数。 */  
    test_event.cb = test_notify;  
    test_event.remove_cb = test_handle_remove; //server主动发起删除该client的订阅的cb函数(如server退出的时候)  
  
    /* 注册test_event */  
    ret = ubus_register_subscriber(ctx, &test_event);  
    if (ret)  
        fprintf(stderr, "Failed to add watch handler: %s\n", ubus_strerror(ret));  
      
    /* 得到要订阅的object的id */  
    ret = ubus_lookup_id(ctx, "test", &obj_id);  
    if (ret)  
        fprintf(stderr, "Failed to lookup object: %s\n", ubus_strerror(ret));  
  
    /* 订阅object */  
    ret = ubus_subscribe(ctx, &test_event, obj_id);  
    if (ret)  
        fprintf(stderr, "Failed to subscribe: %s\n", ubus_strerror(ret));  
}  
  
int main(int argc, char **argv)  
{  
    const char *ubus_socket = NULL;  
  
    uloop_init();  
  
    ctx = ubus_connect(ubus_socket);  
    if (!ctx) {  
        fprintf(stderr, "Failed to connect to ubus\n");  
        return -1;  
    }  
  
    ubus_add_uloop(ctx);  
  
    subscriber_main();  
      
    uloop_run();  
  
    ubus_free(ctx);  
    uloop_done();  
  
    return 0;  
}  

    先运行server&,注册可订阅的对象“test”,并随即每2秒向外广播通知消息。这时还没有client订阅这个对象。 运行多个client程序,由于每个client都订阅了“test”对象,则所有client都会收到server发出的消息。 当client取消订阅后,则不再收到server端的消息。

event的方式实现事件通知

    event机制从一对一的进程之间通信来讲,和invoke机制类似。不过event机制中, 发送方不需要知道谁要接收这个消息,实际上就是一个广播消息。这类似于U盘的热插拔:当插上或拔出U盘时, 内核会广播一个NETLINK事件,之后内核继续做自己的事情,而不关心谁会去监听和处理这个事件。

下面的例子中,client端同时监听了“add_device”和“rm_device”两个事件,而server端会触发“add_device” 事件并携带device的一些信息发送出去。

event_client.c:

#include <libubox/uloop.h>  
#include <libubox/ustream.h>  
#include <libubox/utils.h>  
#include <libubus.h>  
#include <json/json.h>  
#include <libubox/blobmsg_json.h>  
  
static struct ubus_context * ctx = NULL;  
static const char * cli_path;  
  
#define UBUS_EVENT_ADD_DEVICE   "add_device"  
#define UBUS_EVENT_REMOVE_DEVICE    "rm_device"  
  
static void ubus_probe_device_event(struct ubus_context *ctx, struct ubus_event_handler *ev,  
              const char *type, struct blob_attr *msg)  
{  
    char *str;  
  
    if (!msg)  
        return;  
  
    /*  
    在这里实现收到事件后的动作。 
    event也可以传递消息,放在msg中。 
     
    本例子只是将返回的消息打印出来。 
    */  
    str = blobmsg_format_json(msg, true);  
    printf("{ \"%s\": %s }\n", type, str);  
    free(str);  
}  
  
static int client_ubus_register_events()  
{  
    static struct ubus_event_handler listener;  
    int ret = 0;  
  
    /* 注册特定event的listener。多个event可以使用同一个listener */  
    memset(&listener, 0, sizeof(listener));  
    listener.cb = ubus_probe_device_event;  
      
    ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_ADD_DEVICE);  
    if (ret)  
    {  
        fprintf(stderr, "register event failed.\n");  
        return -1;  
    }  
  
    ret = ubus_register_event_handler(ctx, &listener, UBUS_EVENT_REMOVE_DEVICE);  
    if (ret)  
    {  
        ubus_unregister_event_handler(ctx, &listener);  
        fprintf(stderr, "register event failed.\n");  
        return -1;  
    }  
  
    return 0;  
}  
  
static void ubus_add_fd(void)  
{  
    ubus_add_uloop(ctx);  
  
#ifdef FD_CLOEXEC  
    fcntl(ctx->sock.fd, F_SETFD,  
        fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);  
#endif  
}  
  
static void ubus_reconn_timer(struct uloop_timeout *timeout)  
{  
    static struct uloop_timeout retry =  
    {  
        .cb = ubus_reconn_timer,  
    };  
    int t = 2;  
  
    if (ubus_reconnect(ctx, cli_path) != 0) {  
        uloop_timeout_set(&retry, t * 1000);  
        return;  
    }  
  
    ubus_add_fd();  
}  
  
static void ubus_connection_lost(struct ubus_context *ctx)  
{  
    ubus_reconn_timer(NULL);  
}  
  
static int client_ubus_init(const char *path)  
{  
    uloop_init();  
    cli_path = path;  
      
    ctx = ubus_connect(path);  
    if (!ctx)  
    {  
        printf("ubus connect failed\n");  
        return -1;  
    }  
      
    printf("connected as %08x\n", ctx->local_id);  
    ctx->connection_lost = ubus_connection_lost;  
  
    ubus_add_fd();  
  
    return 0;  
}  
  
static void client_ubus_done(void)  
{  
    if (ctx)  
        ubus_free(ctx);  
}  
  
int main(int argc, char * argv[])  
{  
    char * path = NULL;  
  
    /* 连接ubusd */  
    if (UBUS_STATUS_OK != client_ubus_init(path))  
    {  
        printf("ubus connect failed!\n");  
        return -1;  
    }  
  
    /* 注册某个事件的处理函数 */  
    client_ubus_register_events();  
  
    uloop_run();  
      
    client_ubus_done();  
  
    return 0;  
}  

event_server.c:

#include <libubox/uloop.h>  
#include <libubox/ustream.h>  
#include <libubox/utils.h>  
#include <libubus.h>  
#include <json/json.h>  
#include <libubox/blobmsg_json.h>  
  
static struct ubus_context * ctx = NULL;  
static struct blob_buf b;  
static const char * sock_path;  
  
static int server_ubus_send_event(void)  
{  
    blob_buf_init(&b, 0);  
  
    /* 需要传递的参数 */  
    blobmsg_add_u32(&b, "major", 3);  
    blobmsg_add_u32(&b, "minor", 56);  
    blobmsg_add_string(&b, "name", "mmc01");  
  
    /* 广播名为"add_device"的事件 */  
    return ubus_send_event(ctx, "add_device", b.head);  
}  
  
static int display_ubus_init(const char *path)  
{  
    uloop_init();  
    sock_path = path;  
  
    ctx = ubus_connect(path);  
    if (!ctx)  
    {  
        printf("ubus connect failed\n");  
        return -1;  
    }  
      
    printf("connected as %08x\n", ctx->local_id);  
  
    return 0;  
}  
  
static void display_ubus_done(void)  
{  
    if (ctx)  
        ubus_free(ctx);  
}  
  
int main(int argc, char * argv[])  
{  
    char * path = NULL;  
      
    if (-1 == display_ubus_init(path))  
    {  
        printf("ubus connect failed!\n");  
        return -1;  
    }  
  
    server_ubus_send_event();  
  
    display_ubus_done();  
  
    return 0;  
}  

先运行client &注册事件。我们同时启动两个client程序。

再执行server主动触发”add_device”事件,则凡是注册了这个事件的client都会收到该事件并执行各自的处理。

root@NVR:~# ./server  
connected as fdecbdc1  
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }  
{ "add_device": { "major": 3, "minor": 56, "name": "mmc01" } }  

也可以通过命令行的ubus send命令触发事件:

root@NVR:~# ubus send "rm_device" '{ "major": 3, "minor": 56, "name": "mmc01" }'  
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }  
{ "rm_device": { "major": 3, "minor": 56, "name": "mmc01" } }  

在使用ubus时,可根据实际的场景来选择用哪种方式进行进程间通信。如之前所说, ubus是为发送消息而设计的,不合适传输大量数据。

文档信息
--------------
* 版权声明:自由转载-非商用
* 转载: []

落尘纷扰的专栏