Derecho  0.9
Distributed systems toolkit for RDMA
util.cpp
Go to the documentation of this file.
2 
3 #include <cassert>
4 #include <cinttypes>
5 #include <cmath>
6 #include <cstdint>
7 #include <cstring>
8 #include <iostream>
9 #include <numeric>
10 #include <sstream>
11 #include <sys/stat.h>
12 #include <thread>
13 
14 #ifdef USE_SLURM
15 #include <slurm/slurm.h>
16 #endif
17 
18 using namespace std;
19 
20 // This holds the REALTIME timestamp corresponding to the value on the
21 // master node once all other nodes have connected. Until this takes
22 // place, it is initialized to zero.
23 static uint64_t epoch_start = 0;
24 
25 template <class T, class U>
26 T lexical_cast(U u) {
27  stringstream s{};
28  s << u;
29  T t{};
30  s >> t;
31  return t;
32 }
33 
34 bool file_exists(const string &name) {
35  struct stat buffer;
36  return (stat(name.c_str(), &buffer) == 0);
37 }
38 
39 void create_directory(const string &name) {
40  mkdir(name.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
41 }
42 
43 // return bits / nanosecond = Gigabits/second
44 double compute_data_rate(size_t num_bytes, uint64_t sTime, uint64_t eTime) {
45  return ((double)num_bytes) * 8.0 / (eTime - sTime);
46 }
47 void put_flush(const char *str) {
48  // printf("[%6.3f]%s\n", 1.0e-6 * (get_time() - epoch_start), str);
49  puts(str);
50  fflush(stdout);
51 }
52 
53 // Attempts to init environment using slurm and returns whether it was
54 // successful.
55 bool slurm_query_addresses(map<uint32_t, string> &addresses,
56  uint32_t &node_rank) {
57 #ifdef USE_SLURM
58  char *nodeid_ptr = getenv("SLURM_NODEID");
59  char *nnodes_ptr = getenv("SLURM_NNODES");
60  char *hostnames = getenv("SLURM_JOB_NODELIST");
61  if(!nodeid_ptr || !nnodes_ptr || !hostnames) return false;
62 
63  hostlist_t hostlist = slurm_hostlist_create(hostnames);
64  if(!hostlist) return false;
65 
66  char *host;
67  uint32_t i = 0;
68  while((host = slurm_hostlist_shift(hostlist))) {
69  addresses.emplace(i++, host);
70  }
71 
72  slurm_hostlist_destroy(hostlist);
73 
74  node_rank = lexical_cast<uint32_t>(string(nodeid_ptr));
75  uint32_t num_nodes = lexical_cast<uint32_t>(string(nnodes_ptr));
76 
77  assert(addresses.size() == num_nodes);
78  assert(node_rank < num_nodes);
79  return true;
80 #else
81  return false;
82 #endif
83 }
84 
85 void query_peer_addresses(map<uint32_t, string> &addresses,
86  uint32_t &node_rank) {
87  if(slurm_query_addresses(addresses, node_rank)) return;
88 
89  uint32_t num_nodes;
90  cout << "Please enter '[node_id] [num_nodes]': ";
91  cin >> node_rank >> num_nodes;
92 
93  string addr;
94  for(uint32_t i = 0; i < num_nodes; ++i) {
95  // input the connection information here
96  cout << "Please enter IP Address for node " << i << ": ";
97  cin >> addr;
98  addresses.emplace(i, addr);
99  }
100 }
101 
102 void reset_epoch() {
103  LOG_EVENT(-1, -1, -1, "begin_epoch_reset");
104  epoch_start = get_time();
105  LOG_EVENT(-1, -1, -1, "end_epoch_reset");
106 }
107 
108 double compute_mean(std::vector<double> v) {
109  double sum = std::accumulate(v.begin(), v.end(), 0.0);
110  return sum / v.size();
111 }
112 double compute_stddev(std::vector<double> v) {
113  double mean = compute_mean(v);
114  double sq_sum = std::inner_product(v.begin(), v.end(), v.begin(), 0.0);
115  return std::sqrt(sq_sum / v.size() - mean * mean);
116 }
117 
118 vector<event> events;
119 std::mutex events_mutex;
121  auto flush_server = []() {
122  while(true) {
123  flush_events();
124  this_thread::sleep_for(chrono::seconds(10));
125  }
126  };
127  thread t(flush_server);
128  t.detach();
129 }
130 void flush_events() {
131  std::unique_lock<std::mutex> lock(events_mutex);
132 
133  auto basename = [](const char *path) {
134  const char *base = strrchr(path, '/');
135  return base ? base + 1 : path;
136  };
137 
138  static bool print_header = true;
139  if(print_header) {
140  printf(
141  "time, file:line, event_name, group_number, message_number, "
142  "block_number\n");
143  print_header = false;
144  }
145  for(const auto &e : events) {
146  if(e.group_number == (uint32_t)(-1)) {
147  printf("%5.6f, %s:%d, %s\n", 1.0e-6 * (e.time - epoch_start),
148  basename(e.file), e.line, e.event_name);
149 
150  } else if(e.message_number == (size_t)(-1)) {
151  printf("%5.6f, %s:%d, %s, %" PRIu32 "\n",
152  1.0e-6 * (e.time - epoch_start), basename(e.file), e.line,
153  e.event_name, e.group_number);
154 
155  } else if(e.block_number == (size_t)(-1)) {
156  printf("%5.6f, %s:%d, %s, %" PRIu32 ", %zu\n",
157  1.0e-6 * (e.time - epoch_start), basename(e.file), e.line,
158  e.event_name, e.group_number, e.message_number);
159 
160  } else {
161  printf("%5.6f, %s:%d, %s, %" PRIu32 ", %zu, %zu\n",
162  1.0e-6 * (e.time - epoch_start), basename(e.file), e.line,
163  e.event_name, e.group_number, e.message_number,
164  e.block_number);
165  }
166  }
167  fflush(stdout);
168  events.clear();
169 }
vector< event > events
Definition: util.cpp:118
bool file_exists(const string &name)
Definition: util.cpp:34
T lexical_cast(U u)
Definition: util.cpp:26
uint64_t get_time()
Definition: time.h:13
double compute_data_rate(size_t num_bytes, uint64_t sTime, uint64_t eTime)
Definition: util.cpp:44
T mean
Definition: experiment.cpp:30
#define LOG_EVENT(group_number, message_number, block_number, event_name)
Definition: util.hpp:64
STL namespace.
void flush_events()
Definition: util.cpp:130
std::mutex events_mutex
Definition: util.cpp:119
uint32_t node_rank
Definition: experiment.cpp:45
void query_peer_addresses(map< uint32_t, string > &addresses, uint32_t &node_rank)
Definition: util.cpp:85
void put_flush(const char *str)
Definition: util.cpp:47
bool slurm_query_addresses(map< uint32_t, string > &addresses, uint32_t &node_rank)
Definition: util.cpp:55
void start_flush_server()
Definition: util.cpp:120
double compute_mean(std::vector< double > v)
Definition: util.cpp:108
void create_directory(const string &name)
Definition: util.cpp:39
double compute_stddev(std::vector< double > v)
Definition: util.cpp:112
void reset_epoch()
Definition: util.cpp:102
uint64_t addr
Buffer address.
Definition: verbs.hpp:288
uint32_t num_nodes
Definition: experiment.cpp:46