10 #include "../derecho_type_definitions.hpp" 136 seq_num(num_subgroups),
137 delivered_num(num_subgroups),
138 persisted_num(num_subgroups),
139 suspected(parameters.
members.size()),
140 changes(100 + parameters.
members.size()),
141 joiner_ips(100 + parameters.
members.size()),
142 joiner_gms_ports(100 + parameters.
members.size()),
143 joiner_rpc_ports(100 + parameters.
members.size()),
144 joiner_sst_ports(100 + parameters.
members.size()),
145 joiner_rdmc_ports(100 + parameters.
members.size()),
146 num_received(num_received_size),
147 global_min(num_received_size),
148 global_min_ready(num_subgroups),
150 num_received_sst(num_received_size),
151 local_stability_frontier(num_subgroups) {
152 SSTInit(seq_num, delivered_num,
153 persisted_num, vid, suspected, changes, joiner_ips,
154 joiner_gms_ports, joiner_rpc_ports, joiner_sst_ports, joiner_rdmc_ports,
155 num_changes, num_committed, num_acked, num_installed,
156 num_received, wedged, global_min, global_min_ready,
157 slots, num_received_sst, local_stability_frontier, rip);
159 for(
unsigned int row = 0; row <
get_num_rows(); ++row) {
161 for(
size_t i = 0; i < suspected.
size(); ++i) {
162 suspected[row][i] =
false;
164 for(
size_t i = 0; i < changes.
size(); ++i) {
165 changes[row][i] =
false;
167 for(
size_t i = 0; i < global_min_ready.
size(); ++i) {
168 global_min_ready[row][i] =
false;
170 for(
size_t i = 0; i < global_min.
size(); ++i) {
171 global_min[row][i] = 0;
173 memset(const_cast<uint32_t*>(joiner_ips[row]), 0, joiner_ips.
size());
174 memset(const_cast<uint16_t*>(joiner_gms_ports[row]), 0, joiner_gms_ports.
size());
175 memset(const_cast<uint16_t*>(joiner_rpc_ports[row]), 0, joiner_rpc_ports.
size());
176 memset(const_cast<uint16_t*>(joiner_sst_ports[row]), 0, joiner_sst_ports.
size());
177 memset(const_cast<uint16_t*>(joiner_rdmc_ports[row]), 0, joiner_rdmc_ports.
size());
178 num_changes[row] = 0;
179 num_committed[row] = 0;
180 num_installed[row] = 0;
184 struct timespec start_time;
185 clock_gettime(CLOCK_REALTIME, &start_time);
186 auto current_time = start_time.tv_sec * 1e9 + start_time.tv_nsec;
187 for(
size_t i = 0; i < local_stability_frontier.
size(); ++i) {
188 local_stability_frontier[row][i] = current_time;
229 template <
typename Elem>
230 void set(
volatile Elem& e,
const Elem& value) {
232 std::atomic_signal_fence(std::memory_order_acq_rel);
241 template <
typename Elem>
242 void set(
volatile Elem& e,
volatile const Elem& value) {
244 std::atomic_signal_fence(std::memory_order_acq_rel);
256 template <
typename Elem>
257 void set(
volatile Elem* array,
volatile Elem* value,
const size_t length) {
258 static thread_local std::mutex set_mutex;
260 std::lock_guard<std::mutex> lock(set_mutex);
261 memcpy(const_cast<Elem*>(array), const_cast<Elem*>(value),
262 length *
sizeof(Elem));
264 std::atomic_signal_fence(std::memory_order_acq_rel);
275 template <
typename Arr,
size_t Len>
276 void set(
volatile Arr (&e)[Len],
const volatile Arr (&value)[Len]) {
277 static thread_local std::mutex set_mutex;
279 std::lock_guard<std::mutex> lock(set_mutex);
280 memcpy(
const_cast<Arr(&)[Len]
>(e),
const_cast<const Arr(&)[Len]
>(value),
287 std::atomic_signal_fence(std::memory_order_acq_rel);
298 template <
size_t L1,
size_t L2,
typename Arr>
299 void set(
volatile Arr (&dst)[L1],
const volatile Arr (&src)[L2],
const size_t& num) {
300 static thread_local std::mutex set_mutex;
302 std::lock_guard<std::mutex> lock(set_mutex);
303 memcpy(
const_cast<Arr(&)[L2]
>(dst),
const_cast<const Arr(&)[L1]
>(src),
306 std::atomic_signal_fence(std::memory_order_acq_rel);
309 void set(
volatile char* string_array,
const std::string& value);
313 bool equals(
const volatile char& string_array,
const std::string& value);
SSTFieldVector< bool > global_min_ready
Array indicating whether each shard leader (indexed by subgroup number) has published a global_min fo...
SSTField< int > num_changes
How many changes to the view have been proposed.
SSTField< bool > wedged
Set after calling rdmc::wedged(), reports that this member is wedged.
SSTFieldVector< uint16_t > joiner_gms_ports
joiner_xxx_ports are the port numbers for the joining nodes.
SSTFieldVector< uint16_t > joiner_sst_ports
unsigned int get_num_rows() const
Returns the total number of rows in the table.
DerechoSST(const sst::SSTParams ¶meters, uint32_t num_subgroups, uint32_t num_received_size, uint64_t slot_size)
Constructs an SST, and initializes the GMS fields to "safe" initial values (0, false, etc.).
SSTFieldVector< char > slots
for SST multicast
SSTField< int32_t > vid
View ID associated with this SST.
SSTFieldVector< message_id_t > seq_num
Sequence numbers are interpreted like a row-major pair: (sender, index) becomes sender + num_members ...
void increment(volatile int &member)
Thread-safe increment of an integer member of GMSTableRow; ensures there is a std::atomic_signal_fenc...
SSTFieldVector< persistent::version_t > persisted_num
This represents the highest persistent version number that has been persisted to disk at this node...
std::string to_string() const
Creates a string representation of the local row (not the whole table).
SSTField< int > num_acked
How many proposed changes have been seen.
Clients should use instances of this class with the appropriate template parameter to declare fields ...
void init_local_row_from_previous(const DerechoSST &old_sst, const int row, const int num_changes_installed)
Initializes the local row of this SST based on the specified row of the previous View's SST...
SST(DerechoSST *derived_class_pointer, const SSTParams ¶ms)
SSTFieldVector< message_id_t > delivered_num
This represents the highest sequence number that has been delivered at this node. ...
Clients should use instances of this class to declare vector-like fields in their SST; the template p...
const std::vector< uint32_t > & members
List of nodes in the SST; indexes are row numbers, values are node IDs.
SSTField< int > num_installed
How many previously proposed view changes have been installed in the current view.
SSTFieldVector< uint16_t > joiner_rdmc_ports
SSTFieldVector< int32_t > num_received_sst
SSTFieldVector< int32_t > num_received
Local count of number of received messages by sender.
size_t size() const
Just like std::vector::size(), returns the number of elements in this vector.
SSTFieldVector< uint64_t > local_stability_frontier
to check for failures - used by the thread running check_failures_loop in derecho_group ...
SSTField< bool > rip
to signal a graceful exit
The GMS and derecho_group will share the same SST for efficiency.
Constructor parameter pack for SST.
void init_local_change_proposals(const int other_row)
Copies currently proposed changes and the various counter values associated with them to the local ro...
SSTFieldVector< uint32_t > joiner_ips
If changes[i] is a Join, joiner_ips[i] is the IP address of the joining node, packed into an unsigned...
SSTFieldVector< int > global_min
Array of how many messages to accept from each sender in the current view change. ...
bool equals(const volatile char &string_array, const std::string &value)
void SSTInit(Fields &... fields)
SSTFieldVector< uint16_t > joiner_rpc_ports
SSTFieldVector< node_id_t > changes
An array of the same length as View::members, containing a list of proposed changes to the view that ...
SSTField< int > num_committed
How many proposed view changes have reached the commit point.
SSTFieldVector< bool > suspected
Array of same length as View::members, where each bool represents whether the corresponding member is...