10 #include <condition_variable> 20 #include "../predicates.hpp" 29 template <
typename DerivedSST>
31 thread_shutdown =
true;
32 for(
auto& thread : background_threads) {
33 if(thread.joinable()) thread.join();
37 delete[](
const_cast<char*
>(rows));
46 template <
typename DerivedSST>
48 std::lock_guard<std::mutex> lock(thread_start_mutex);
50 thread_start_cv.notify_all();
60 template <
typename DerivedSST>
62 pthread_setname_np(pthread_self(),
"sst_detect");
64 std::unique_lock<std::mutex> lock(thread_start_mutex);
65 thread_start_cv.wait(lock, [
this]() {
return thread_start; });
67 struct timespec last_time, cur_time;
68 clock_gettime(CLOCK_REALTIME, &last_time);
70 while(!thread_shutdown) {
71 bool predicate_fired =
false;
73 std::unique_lock<std::mutex> predicates_lock(predicates.predicate_mutex);
76 for(
auto& pred : predicates.one_time_predicates) {
77 if(pred !=
nullptr && (pred->first(*derived_this) ==
true)) {
78 predicate_fired =
true;
81 std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(pred->second);
82 predicates_lock.unlock();
83 (*trigger)(*derived_this);
84 predicates_lock.lock();
91 for(
auto& pred : predicates.recurrent_predicates) {
92 if(pred !=
nullptr && (pred->first(*derived_this) ==
true)) {
93 predicate_fired =
true;
94 std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(pred->second);
95 predicates_lock.unlock();
96 (*trigger)(*derived_this);
97 predicates_lock.lock();
103 auto pred_it = predicates.transition_predicates.begin();
104 auto pred_state_it = predicates.transition_predicate_states.begin();
105 while(pred_it != predicates.transition_predicates.end()) {
106 if(*pred_it !=
nullptr) {
108 bool curr_pred_state = (*pred_it)->first(*derived_this);
109 if(curr_pred_state ==
true && *pred_state_it ==
false) {
110 predicate_fired =
true;
111 std::shared_ptr<typename Predicates<DerivedSST>::trig> trigger(
113 predicates_lock.unlock();
114 (*trigger)(*derived_this);
115 predicates_lock.lock();
117 *pred_state_it = curr_pred_state;
124 if(predicate_fired) {
126 clock_gettime(CLOCK_REALTIME, &last_time);
128 clock_gettime(CLOCK_REALTIME, &cur_time);
130 double time_elapsed_in_ms = (cur_time.tv_sec - last_time.tv_sec) * 1e3
131 + (cur_time.tv_nsec - last_time.tv_nsec) / 1e6;
132 if(time_elapsed_in_ms > 1) {
133 predicates_lock.unlock();
135 std::this_thread::sleep_for(1ms);
136 predicates_lock.lock();
143 template <
typename DerivedSST>
145 assert(offset + size <= rowLen);
146 for(
auto index : receiver_ranks) {
148 if(index == my_index || row_is_frozen[index]) {
152 res_vec[index]->post_remote_write(offset, size);
157 template <
typename DerivedSST>
159 assert(offset + size <= rowLen);
160 unsigned int num_writes_posted = 0;
161 std::vector<bool> posted_write_to(num_members,
false);
163 const auto tid = std::this_thread::get_id();
173 for(
auto index : receiver_ranks) {
175 if(index == my_index || row_is_frozen[index]) {
180 sctxt[index].
ce_idx = ce_idx;
181 res_vec[index]->post_remote_write_with_completion(&sctxt[index], offset, size);
182 posted_write_to[index] =
true;
187 std::vector<bool> polled_successfully_from(num_members,
false);
189 std::vector<uint32_t> failed_node_indexes;
191 unsigned long start_time_msec;
192 unsigned long cur_time_msec;
193 struct timeval cur_time;
196 gettimeofday(&cur_time, NULL);
197 start_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
200 for(
unsigned int index = 0; index < num_writes_posted; ++index) {
201 std::optional<std::pair<int32_t, int32_t>> ce;
209 gettimeofday(&cur_time, NULL);
210 cur_time_msec = (cur_time.tv_sec * 1000) + (cur_time.tv_usec / 1000);
211 if((cur_time_msec - start_time_msec) >= poll_cq_timeout_ms) {
218 for(
unsigned int index2 = 0; index2 < num_members; ++index2) {
219 if(!posted_write_to[index2] || polled_successfully_from[index2]) {
222 failed_node_indexes.push_back(index2);
227 auto ce_v = ce.value();
228 int remote_id = ce_v.first;
229 int result = ce_v.second;
231 polled_successfully_from[remote_id] =
true;
232 }
else if(result == -1) {
233 if(!row_is_frozen[index]) {
234 failed_node_indexes.push_back(remote_id);
241 for(
auto index : failed_node_indexes) {
246 template <
typename DerivedSST>
249 std::lock_guard<std::mutex> lock(freeze_mutex);
250 if(row_is_frozen[row_index]) {
253 row_is_frozen[row_index] =
true;
260 failure_upcall(members[row_index]);
271 template <
typename DerivedSST>
273 unsigned int node_id, sst_index;
274 for(
auto const& id_index : members_by_id) {
275 std::tie(node_id, sst_index) = id_index;
276 if(sst_index != my_index && !row_is_frozen[sst_index]) {
285 template <
typename DerivedSST>
287 for(
auto const& row_index : row_indices) {
288 if(row_index == my_index) {
291 if(!row_is_frozen[row_index]) {
292 sync(members[row_index]);
uint32_t get_index(const std::thread::id id)
void set_waiting(const std::thread::id id)
void reset_waiting(const std::thread::id id)
~SST()
Destructor for the SST object; sets thread_shutdown to true and waits for background threads to exit ...
void sync_with_members() const
Does a TCP sync with each member of the SST.
bool sync(uint32_t r_id)
Blocks the current thread until both this node and a remote node reach this function, which exchanges some trivial data over a TCP connection.
void put_with_completion()
void freeze(int row_index)
Marks a row as frozen, so it will no longer update, and its corresponding node will not receive write...
void detect()
This function is run in a detached background thread to detect predicate events.
void start_predicate_evaluation()
Starts the predicate evaluation loop.
std::optional< std::pair< int32_t, int32_t > > get_completion_entry(const std::thread::id id)
void put()
Writes the entire local row to all remote nodes.