心尊门 发表于 2019-10-5 13:27:32

EOS整体源码结构之nodeos和keosd

EOS是Block.One公司开发的一款超高性能的类操作系统区块链,其设计目标是:支持大量用户,可能是上亿级别的用户, 消除手续费(可以免费使用),超高性能(支持百万级TPS),具备横向和纵向性能扩展能力。本篇文章讲解EOS源码的整体结构中nodeos和keosd部分,后面会有一篇讲cleos的文章。

EOS由一些列的程序组成,最重要的是三个三个便是:nodeos,cleos,keosd

?nodeos (node + eos = nodeos), eos主节点程序?cleos (cli + eos = cleos), 命令行程序,用来作为管理发送命令来管理钱包和节点?keos (key + eos = keosd), 安全的存储keys的钱包组件 它们的协作关系如下图所示:



它们都在EOS中:

1.nodeos

1.1 整个程序入口
int main(int argc, char **argv){    try    {      //设定版本号      app().set_version(eosio::nodeos::config::version);
      //创建文件存储路径      auto root = fc::app_path();      app().set_default_data_dir(root / "eosio" / nodeos::config::node_executable_name / "data");      app().set_default_config_dir(root / "eosio" / nodeos::config::node_executable_name / "config");      //设定RPC监听endpoint      http_plugin::set_defaults({.default_unix_socket_path = "", .default_http_port = 8888});      //初始化最核心的三个插件      if (!app().initialize<chain_plugin, net_plugin, producer_plugin>(argc, argv))            return INITIALIZE_FAIL;      //初始化日志系统      initialize_logging();      ...      ilog("${name} using configuration file ${c}", ("name", nodeos::config::node_executable_name)( "c", app().full_config_file_path().string()));      ...      //启动程序      app().startup();      //维持程序一直运行不停下      app().exec();    }    catch (const extract_genesis_state_exception &e)    {      ....    }
    ilog("${name} successfully exiting", ("name", nodeos::config::node_executable_name));    return SUCCESS;}
1.2 nodeos application的实例:app()

在libraries/appbase/include/appbase/application.hpp中能看到app()是一个全局变量
namespace appbase{...class application{...};
//appbase唯一的全局变量application &app();
template <typename Impl>class plugin : public abstract_plugin{...};
template <typename Data, typename DispatchPolicy>void channel<Data, DispatchPolicy>::publish(int priority, const Data &data){...}
} // namespace appbase

看看这个唯一的全局变量干了什么
application &application::instance(){    static application _app;    return _app;}application &app() { return application::instance(); }
可以看到它是把application对象使用单例模式实例化了,我们简单看看application类的结构
classapplication{public:    ~application();
//配置处理方法voidset_version(uint64_t version);    ...    bfs::path full_config_file_path()const;
//处理起停方法voidset_sighup_callback(std::function<void()> callback);boolinitialize(int argc, char **argv){...}voidstartup();voidshutdown();voidexec();voidquit();boolis_quiting()const;    ...
//plugin处理方法abstract_plugin *find_plugin(conststring &name)const;abstract_plugin &get_plugin(conststring &name)const;auto ?ister_plugin(){...}Plugin *find_plugin()const{...}Plugin &get_plugin()const{...}
//绑定io_serv和pri_queue.wraptemplate <typename Func>autopost(int priority, Func &&func){return boost::asio::post(*io_serv, pri_queue.wrap(priority, std::forward<Func>(func)));    }
//其他    ...
private:    application();//管理plugin map<string, std::unique_ptr<abstract_plugin>> plugins;vector<abstract_plugin *> initialized_plugins; vector<abstract_plugin *> running_plugins;   ...//后面讲exe()用到std::shared_ptr<boost::asio::io_service> io_serv;    execution_priority_queue pri_queue;//application_impl 实例指针std::unique_ptr<classapplication_impl> my;};
可以看到里面除了application基本功能的方法,还有很多plugin相关和io_serv相关的内容。我们看它们是怎么用起来的。

1.3 plugin的使用从这里开始:app().initialize<...>(argc, argv),我们追查进去
//实现如下template <typename... Plugin>boolinitialize(int argc, char **argv){return initialize_impl(argc, argv, {find_plugin<Plugin>()...});}
//看find_plugin<Plugin>()...template <typename Plugin>Plugin *find_plugin()const{string name = boost::core::demangle(typeid(Plugin).name());returndynamic_cast<Plugin *>(find_plugin(name));}
//看find_plugin(name)abstract_plugin *application::find_plugin(conststring &name) const{auto itr = plugins.find(name);if (itr == plugins.end())    {returnnullptr;    }return itr->second.get();}
//还有initialize_implbool application::initialize_impl(int argc, char **argv, vector<abstract_plugin *> autostart_plugins){    set_program_options();    ...//这里从启动命令行或者配置文件中获取需要的pluginif (options.count("plugin") > 0)    {auto plugins = options.at("plugin").as<std::vector<std::string>>();for (auto &arg : plugins)      {vector<string> names;            boost::split(names, arg, boost::is_any_of(" \t,"));for (conststd::string &name : names)                get_plugin(name).initialize(options);      }    }try    {//这里是app().initialize<...>中传入的三个pluginfor (auto plugin : autostart_plugins)if (plugin != nullptr && plugin->get_state() == abstract_plugin::registered)                plugin->initialize(options);
      ...    }catch (...) {...}
returntrue;}
//看plugin.initialize(options),在plugin中virtualvoidinitialize(const variables_map &options) override{//_state的默认值设定的就是abstract_plugin::registeredif (_state == registered)    {      _state = initialized;static_cast<Impl *>(this)->plugin_requires([&](auto &plug) { plug.initialize(options); });static_cast<Impl *>(this)->plugin_initialize(options);      app().plugin_initialized(*this);    }    assert(_state == initialized);}
//看app().plugin_initialized(*this);voidplugin_initialized(abstract_plugin &plug){ initialized_plugins.push_back(&plug); }
由此我们可以看到app().initialize做的事情,就是找到需要的插件,然后注册注册插件依赖的插件,然后将它们存到appllication的initialized_plugins中

1.4启动application: app().startup(), 就是调用appllication的initialized_plugins每个plugin的startup方法,使得需要的plugin都运行起来
void application::startup(){    try    {      for (auto plugin : initialized_plugins)      {            if (is_quiting())                break;            plugin->startup();      }    }    catch (...){...}}
1.5 开始运行:app().exec();
void application::exec(){    boost::asio::io_service::work work(*io_serv);    (void)work;    bool more = true;    while (more || io_serv->run_one())    {      while (io_serv->poll_one())      {      }      // execute the highest priority item      more = pri_queue.execute_highest();    }
    shutdown(); /// perform synchronous shutdown    io_serv.reset();}
boost::asio::io_service::work使得io_serv不会执行完任务就立即退出,保证nodeos一直运行着,exec()里面还包含shutdown(),运行时,调用每个plugin的shutdown(),然后从aplication的pulgins中移除,然后清空running_plugins,initialized_plugins,plugins。最后调用io_serv.stop()

1.6 执行所有的任务: pri_queue.execute_highest();

nodeos使用io_serv和pri_queue,通过app().post(...)将任务添加进来,这些任务包含处理P2P信息,所有RPC请求,处理进来的交易等,包含了大部分nodeos要做的事情。我们抽取几个重要部分的代码。将实现过程展现出来,对于boost::asio::io_service不了解的同学还请自行查找资料学习,里面的东西很多。
//appbase/application.hpp中将io_serv和pri_queue绑定在一起,//将pri_queue中的任务通过app().post()传入io_serv中。//pri_queue是nodeos自己实现了一个优先级队列template <typename Func>    auto post(int priority, Func &&func)    {      return boost::asio::post(*io_serv, pri_queue.wrap(priority, std::forward<Func>(func)));    }
//pri_queue的实现过程//定义优先级别,咱们在之前的RPC运行讲解时就曾看到过struct priority{    static constexpr int high = 100;    static constexpr int medium = 50;    static constexpr int low = 10;};
//我们简取部分重要代码class execution_priority_queue : public boost::asio::execution_context{public:    //handlers_就是用来存储优先级和具体队列的地方,后面能看到定义    template <typename Function>    void add(int priority, Function function)    {      std::unique_ptr<queued_handler_base> handler(            //组建带有优先级的任务的queued_handler            new queued_handler<Function>(priority, --order_, std::move(function)));
      //添加新的带有优先级的任务(方法)      handlers_.push(std::move(handler));    }
    //执行所有任务    void execute_all()    {      while (!handlers_.empty())      {            handlers_.top()->execute();            handlers_.pop();      }    }
    //执行优先级最高的方法,app().exec()里面调用的就是这里    bool execute_highest()    {      if (!handlers_.empty())      {            handlers_.top()->execute();            handlers_.pop();      }
      return !handlers_.empty();    }
    //查看总共有多少任务    size_t size() { return handlers_.size(); }
    class executor    {    public:      executor(execution_priority_queue &q, int p)            : context_(q), priority_(p)      {      }
      execution_priority_queue &context() const noexcept      {            return context_;      }
      ...      template <typename Function, typename Allocator>      void post(Function f, const Allocator &) const      {            //添加带有优先级的任务(方法)            context_.add(priority_, std::move(f));      }      ...
      //比较优先级的重载      bool operator==(const executor &other) const noexcept      {            return &context_ == &other.context_ && priority_ == other.priority_;      }      ...
    private:      execution_priority_queue &context_;      int priority_;    };
    //这里就是绑定io_serv和pri_queue调用的方法    template <typename Function>    boost::asio::executor_binder<Function, executor>    wrap(int priority, Function &&func)    {      return boost::asio::bind_executor(executor(*this, priority), std::forward<Function>(func));    }
private:    class queued_handler_base    {    public:      ...
      //比较优先级的重载      friend bool operator<(const std::unique_ptr<queued_handler_base> &a,                              const std::unique_ptr<queued_handler_base> &b) noexcept      {            return std::tie(a->priority_, a->order_) < std::tie(b->priority_, b->order_);      }
    private:      int priority_;      size_t order_;    };
    template <typename Function>    class queued_handler : public queued_handler_base    {    public:      queued_handler(int p, size_t order, Function f)            : queued_handler_base(p, order), function_(std::move(f))      {      }
      //执行任务(方法)      void execute() override      {            function_();      }
    private:      Function function_;    };
    //保存所有任务和优先级的handlers_    std::priority_queue<std::unique_ptr<queued_handler_base>, std::deque<std::unique_ptr<queued_handler_base>>> handlers_;    std::size_t order_ = std::numeric_limits<size_t>::max(); // to maintain FIFO ordering in queue within priority};
所以nodeos所做的事情就是把plugin都启动起来,然后再启动io_serv,通过pri_queue完成这种各样的任务,这些任务执行时调用plugin,实现区块链状态的推进。

1.7 再来说一下plugin结构,每个plugin都继承于abstract_plugin,结构如下,从上面的讲解中能看出app()是怎么使用的
namespace appbase {   class abstract_plugin {      public:         enum state {            registered, ///< the plugin is constructed but doesn't do anything            initialized, ///< the plugin has initialized any state required but is idle            started, ///< the plugin is actively running            stopped ///< the plugin is no longer running         };
         virtual ~abstract_plugin(){}         virtual state get_state()const = 0;         virtual const std::string& name()const= 0;         virtual void set_program_options( options_description& cli, options_description& cfg ) = 0;         virtual void initialize(const variables_map& options) = 0;         virtual void handle_sighup() = 0;         virtual void startup() = 0;         virtual void shutdown() = 0;   };}
2.keosd

2.1 keosd的机构和nodeos的一样只是启动的时候用的是wallet_plugin, wallet_api_plugin和http_plugin
int main(int argc, char **argv){   try   {   ...      app().register_plugin<wallet_api_plugin>();      if (!app().initialize<wallet_plugin, wallet_api_plugin, http_plugin>(argc, argv))         return -1;       ...      app().startup();      app().exec();   }   catch (...){...}   return 0;}
好了,nodeos和keosd就先讲到这里了,欢迎关注本公众号,共同学习公链源码!也欢迎加入我们的微信群,共同学习交流



References

EOS: https://github.com/EOSIO/eos
页: [1]
查看完整版本: EOS整体源码结构之nodeos和keosd