8 #include <spdlog/async.h> 9 #include <spdlog/sinks/rotating_file_sink.h> 10 #include <spdlog/sinks/stdout_color_sinks.h> 12 #include "../group.hpp" 20 template <
typename SubgroupType>
23 return gptr->get_subgroup(subgroup_num);
25 throw derecho_exception(
"Error: this top-level group contains no subgroups for the selected type.");
28 template <
typename SubgroupType>
31 return gptr->get_nonmember_subgroup(subgroup_num);
33 throw derecho_exception(
"Error: this top-level group contains no subgroups for the selected type.");
36 template <
typename SubgroupType>
39 return gptr->get_subgroup_members(subgroup_index);
41 throw derecho_exception(
"Error: this top-level group contains no subgroups for the selected type.");
44 template <
typename SubgroupType>
47 return gptr->get_number_of_shards(subgroup_index);
49 throw derecho_exception(
"Error: this top-level group contains no subgroups for the selected type.");
52 template <
typename ReplicatedType>
56 set_replicated_pointer(std::type_index{
typeid(ReplicatedType)}, subgroup_num,
61 template <
typename ReplicatedType>
65 set_external_caller_pointer(std::type_index{
typeid(ReplicatedType)}, subgroup_num,
70 template <
typename ReplicatedType>
71 std::vector<std::vector<node_id_t>>
73 return get_view_manager().get_subgroup_members(
get_index_of_type(
typeid(ReplicatedType)), subgroup_index);
76 template <
typename ReplicatedType>
79 return get_view_manager().get_number_of_shards_in_subgroup(
get_index_of_type(
typeid(ReplicatedType)), subgroup_index);
82 template <
typename... ReplicatedTypes>
84 uint32_t subgroup_num,
86 ((*ret = (type == std::type_index{
typeid(ReplicatedTypes)}
87 ? &get_subgroup<ReplicatedTypes>(subgroup_num)
92 template <
typename... ReplicatedTypes>
94 assert_always((std::type_index{ti} == std::type_index{
typeid(ReplicatedTypes)} || ... ||
false));
95 return (((std::type_index{ti} == std::type_index{
typeid(ReplicatedTypes)}) ?
102 template <
typename... ReplicatedTypes>
107 template <
typename... ReplicatedTypes>
109 uint32_t subgroup_num,
111 ((*ret = (type == std::type_index{
typeid(ReplicatedTypes)}
112 ? &get_nonmember_subgroup<ReplicatedTypes>(subgroup_num)
118 template <
typename... ReplicatedTypes>
122 std::vector<view_upcall_t> _view_upcalls,
127 leader_connection([&]() ->
std::optional<
tcp::socket> {
128 if(!is_starting_leader) {
133 user_deserialization_context(deserialization_context),
134 persistence_manager(objects_by_subgroup_id, callbacks.local_persistence_callback),
138 if(is_starting_leader) {
140 {std::type_index(
typeid(ReplicatedTypes))...},
141 std::disjunction_v<has_persistent_fields<ReplicatedTypes>...>,
142 tcp_sockets, objects_by_subgroup_id,
143 persistence_manager.get_callbacks(),
148 {std::type_index(
typeid(ReplicatedTypes))...},
149 std::disjunction_v<has_persistent_fields<ReplicatedTypes>...>,
150 tcp_sockets, objects_by_subgroup_id,
151 persistence_manager.get_callbacks(),
155 rpc_manager(view_manager, deserialization_context),
158 bool initial_view_confirmed =
false;
159 while(!initial_view_confirmed) {
163 const vector_int64_2d& old_shard_leaders = view_manager.get_old_shard_leaders();
166 std::set<std::pair<subgroup_id_t, node_id_t>> subgroups_and_leaders_to_receive
167 = construct_objects<ReplicatedTypes...>(view_manager.get_current_view_const().get(),
170 view_manager.truncate_logs();
171 view_manager.send_logs();
172 receive_objects(subgroups_and_leaders_to_receive);
173 if(is_starting_leader) {
174 bool leader_has_quorum =
true;
175 initial_view_confirmed = view_manager.leader_prepare_initial_view(leader_has_quorum);
176 if(!leader_has_quorum) {
179 view_manager.await_rejoining_nodes(my_id);
183 initial_view_confirmed = view_manager.check_view_committed(leader_connection.value());
186 if(is_starting_leader) {
189 view_manager.leader_commit_initial_view();
192 view_manager.initialize_multicast_groups(callbacks);
193 rpc_manager.create_connections();
196 view_manager.finish_setup();
198 rpc_manager.start_listening();
199 view_manager.start();
200 persistence_manager.start();
204 template <
typename... ReplicatedTypes>
206 :
Group({}, subgroup_info,
nullptr, {}, factories...) {}
208 template <
typename... ReplicatedTypes>
214 persistence_manager.shutdown(
true);
215 tcp_sockets->destroy();
218 template <
typename... ReplicatedTypes>
219 template <
typename FirstType,
typename... RestTypes>
221 const View& curr_view,
223 std::set<std::pair<subgroup_id_t, uint32_t>> subgroups_to_receive;
225 return subgroups_to_receive;
230 for(uint32_t subgroup_index = 0; subgroup_index < subgroup_ids.size(); ++subgroup_index) {
233 bool in_subgroup =
false;
235 for(uint32_t shard_num = 0; shard_num < num_shards; ++shard_num) {
236 const std::vector<node_id_t>& members = curr_view.
subgroup_shard_views.at(subgroup_id).at(shard_num).members;
238 if(std::find(members.begin(), members.end(), my_id) != members.end()) {
242 auto old_object = replicated_objects.template get<FirstType>().find(subgroup_index);
243 if(old_object != replicated_objects.template get<FirstType>().end() && old_object->second.get_shard_num() != shard_num) {
244 dbg_default_debug(
"Deleting old Replicated Object state for type {}; I was reassigned from shard {} to shard {}",
245 typeid(FirstType).name(), old_object->second.get_shard_num(), shard_num);
247 objects_by_subgroup_id.erase(subgroup_id);
248 replicated_objects.template get<FirstType>().erase(old_object);
251 if(replicated_objects.template get<FirstType>().count(subgroup_index) == 0) {
253 bool has_previous_leader = old_shard_leaders.size() > subgroup_id
254 && old_shard_leaders[subgroup_id].size() > shard_num
255 && old_shard_leaders[subgroup_id][shard_num] > -1
256 && old_shard_leaders[subgroup_id][shard_num] != my_id;
257 if(has_previous_leader) {
258 subgroups_to_receive.emplace(subgroup_id, old_shard_leaders[subgroup_id][shard_num]);
263 replicated_objects.template get<FirstType>().emplace(
265 subgroup_id, subgroup_index,
266 shard_num, rpc_manager,
this));
268 replicated_objects.template get<FirstType>().emplace(
270 subgroup_id, subgroup_index, shard_num, rpc_manager,
271 factories.template get<FirstType>(),
this));
274 objects_by_subgroup_id.emplace(subgroup_id,
275 replicated_objects.template get<FirstType>().at(subgroup_index));
282 auto old_object = replicated_objects.template get<FirstType>().find(subgroup_index);
283 if(old_object != replicated_objects.template get<FirstType>().end()) {
284 dbg_default_debug(
"Deleting old Replicated Object state (of type {}) for subgroup {} because this node is no longer a member",
285 typeid(FirstType).name(), subgroup_index);
286 objects_by_subgroup_id.erase(subgroup_id);
287 replicated_objects.template get<FirstType>().erase(old_object);
290 external_callers.template get<FirstType>().emplace(
292 my_id, subgroup_id, rpc_manager));
295 return functional_insert(subgroups_to_receive, construct_objects<RestTypes...>(curr_view, old_shard_leaders));
298 template <
typename... ReplicatedTypes>
301 persistence_manager.set_view_manager(view_manager);
304 curr_view.
get().multicast_group->register_rpc_callback([
this](
subgroup_id_t subgroup,
node_id_t sender,
char* buf, uint32_t size) {
305 rpc_manager.rpc_message_handler(subgroup, sender, buf, size);
307 view_manager.add_view_upcall([
this](
const View& new_view) {
308 rpc_manager.new_view_callback(new_view);
312 view_manager.register_initialize_objects_upcall([
this](
node_id_t my_id,
const View& view,
314 std::set<std::pair<subgroup_id_t, node_id_t>> subgroups_and_leaders
315 = construct_objects<ReplicatedTypes...>(view, old_shard_leaders);
316 receive_objects(subgroups_and_leaders);
320 template <
typename... ReplicatedTypes>
321 template <
typename SubgroupType>
323 if(!view_manager.get_current_view().get().is_adequately_provisioned) {
327 return replicated_objects.template get<SubgroupType>().at(subgroup_index);
328 }
catch(std::out_of_range& ex) {
333 template <
typename... ReplicatedTypes>
334 template <
typename SubgroupType>
337 return external_callers.template get<SubgroupType>().at(subgroup_index);
338 }
catch(std::out_of_range& ex) {
339 throw invalid_subgroup_exception(
"No ExternalCaller exists for the requested subgroup; this node may be a member of the subgroup");
343 template <
typename... ReplicatedTypes>
344 template <
typename SubgroupType>
347 auto& EC = external_callers.template get<SubgroupType>().at(subgroup_index);
348 View& curr_view = view_manager.get_current_view().get();
352 std::vector<node_id_t> shard_reps(shard_subviews.size());
353 for(uint i = 0; i < shard_subviews.size(); ++i) {
355 shard_reps[i] = shard_subviews[i].members.at(0);
358 }
catch(std::out_of_range& ex) {
359 throw invalid_subgroup_exception(
"No ExternalCaller exists for the requested subgroup; this node may be a member of the subgroup");
363 template <
typename... ReplicatedTypes>
366 for(
const auto& subgroup_and_leader : subgroups_and_leaders) {
368 = tcp_sockets->get_socket(subgroup_and_leader.second);
369 ReplicatedObject& subgroup_object = objects_by_subgroup_id.at(subgroup_and_leader.first);
373 log_tail_length, subgroup_and_leader.first, subgroup_and_leader.second);
374 leader_socket.
get().write(log_tail_length);
376 dbg_default_debug(
"Receiving Replicated Object state for subgroup {} from node {}",
377 subgroup_and_leader.first, subgroup_and_leader.second);
378 std::size_t buffer_size;
379 bool success = leader_socket.
get().read(buffer_size);
381 char* buffer =
new char[buffer_size];
382 success = leader_socket.
get().read(buffer, buffer_size);
387 dbg_default_debug(
"Done receiving all Replicated Objects from subgroup leaders");
390 template <
typename... ReplicatedTypes>
395 template <
typename... ReplicatedTypes>
398 view_manager.silence();
401 view_manager.leave();
404 template <
typename... ReplicatedTypes>
409 template <
typename... ReplicatedTypes>
410 template <
typename SubgroupType>
414 template <
typename... ReplicatedTypes>
415 template <
typename SubgroupType>
417 return view_manager.
get_my_shard(index_of_type<SubgroupType, ReplicatedTypes...>, subgroup_index);
420 template <
typename... ReplicatedTypes>
425 template <
typename... ReplicatedTypes>
430 template <
typename... ReplicatedTypes>
435 template <
typename... ReplicatedTypes>
uint32_t subgroup_id_t
Type alias for the internal Subgroup IDs generated by ViewManager.
The Deserialization Interface to be implemented by user applications.
auto & get_subgroup(uint32_t subgroup_num=0)
void barrier_sync()
Waits until all members of the group have called this function.
mutils::KindMap< Factory, Types... > make_kind_map(Factory< Types >... factories)
Constructs a KindMap<Factory, Types...> from a list of factories of those types.
The top-level object for creating a Derecho group.
std::vector< std::vector< int64_t > > vector_int64_2d
Type of a 2-dimensional vector used to store potential node IDs, or -1.
std::vector< std::vector< SubView > > subgroup_shard_views
Maps subgroup ID -> shard number -> SubView for that subgroup/shard.
ViewManager & get_view_manager() override
const uint16_t getConfUInt16(const std::string &key)
void debug_print_status() const
uint32_t subgroup_type_id_t
Type of the numeric ID used to refer to subgroup types within a Group; this is currently computed as ...
A little helper class that wraps together a reference and a lock on a related mutex.
std::set< T > functional_insert(std::set< T > &a, const std::set< T > &b)
Inserts set b into set a and returns the modified a.
std::is_base_of< PersistsFields, T > has_persistent_fields
A template whose member field "value" will be true if type T inherits from PersistsFields, and false otherwise.
const std::string & getConfString(const std::string &key)
void set_replicated_pointer(std::type_index type, uint32_t subgroup_num, void **ret)
Group(const CallbackSet &callbacks, const SubgroupInfo &subgroup_info, IDeserializationContext *deserialization_context, std::vector< view_upcall_t > _view_upcalls={}, Factory< ReplicatedTypes >... factories)
Constructor that starts a new managed Derecho group with this node as the leader. ...
const uint32_t getConfUInt32(const std::string &key)
ExternalCaller< ReplicatedType > & get_nonmember_subgroup(uint32_t subgroup_index=0)
std::vector< std::vector< node_id_t > > get_subgroup_members(uint32_t subgroup_index=0)
#define dbg_default_debug(...)
std::size_t get_number_of_shards(uint32_t subgroup_index=0)
auto & get_nonmember_subgroup(uint32_t subgroup_num=0)
void set(volatile Elem &e, const Elem &value)
Thread-safe setter for DerechoSST members; ensures there is a std::atomic_signal_fence after writing ...
bool is_adequately_provisioned
Set to false during MulticastGroup setup if a subgroup membership function throws a subgroup_provisio...
An exception that indicates that a subgroup membership function was unable to finish executing becaus...
std::map< subgroup_type_id_t, std::vector< subgroup_id_t > > subgroup_ids_by_type_id
Maps the (type, index) pairs used by users to identify subgroups to the internal subgroup IDs generat...
Replicated< ReplicatedType > & get_subgroup(uint32_t subgroup_num=0)
#define CONF_DERECHO_LEADER_GMS_PORT
virtual std::size_t receive_object(char *buffer)=0
constexpr uint32_t index_of_type
A compile-time "function" that computes the index of a type within a template parameter pack of types...
virtual uint32_t get_index_of_type(const std::type_info &)=0
std::size_t get_number_of_shards(uint32_t subgroup_index=0)
#define CONF_DERECHO_LOCAL_IP
Common interface for all types of Replicated<T>, specifying some methods for state transfer and persi...
std::function< std::unique_ptr< T >(persistent::PersistentRegistry *)> Factory
The type of factory function the user must provide to the Group constructor, to construct each Replic...
#define CONF_DERECHO_RPC_PORT
Bundles together a set of callback functions for message delivery events.
#define CONF_DERECHO_GMS_PORT
uint32_t node_id_t
Type alias for Node IDs in a Derecho group.
std::vector< std::vector< node_id_t > > get_subgroup_members(uint32_t subgroup_index=0)
std::int32_t get_my_rank()
Returns the order of this node in the sequence of members of the group.
#define assert_always(x...)
Container for whatever information is needed to describe a Group's subgroups and shards.
#define CONF_DERECHO_LOCAL_ID
uint32_t get_index_of_type(const std::type_info &) override
void report_failure(const node_id_t who)
Reports to the GMS that the given node has failed.
std::vector< node_id_t > get_members()
Returns a vector listing the nodes that are currently members of the group.
virtual const persistent::version_t get_minimum_latest_persisted_version() noexcept(false)=0
Base exception class for all exceptions raised by Derecho.
#define CONF_DERECHO_LEADER_IP
virtual bool is_persistent() const =0
std::int32_t get_my_shard(uint32_t subgroup_index=0)
Returns the shard number that this node is a member of in the specified subgroup (by subgroup type an...
void set_external_caller_pointer(std::type_index type, uint32_t subgroup_num, void **ret)
Exception that means the user made an invalid request for a subgroup handle, such as by supplying an ...