Derecho  0.9
Distributed systems toolkit for RDMA
test.cpp
Go to the documentation of this file.
5 #include <iostream>
6 #include <signal.h>
7 #include <spdlog/spdlog.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <sys/mman.h>
11 #include <time.h>
12 
13 using namespace persistent;
14 using namespace mutils;
15 using std::cout;
16 using std::endl;
17 using std::cerr;
18 
19 void sig_handler(int num) {
20  printf("I received signal:%d.\n", num);
21 }
22 
23 // A test class
24 class X {
25 public:
26  int x;
27  const std::string to_string() {
28  return std::to_string(x);
29  }
30 };
31 
32 class ReplicatedT {
33 };
34 
35 PersistentRegistry pr(nullptr, typeid(ReplicatedT), 123, 321);
36 
37 #define MAX_VB_SIZE (1ull << 30)
38 // A variable that can change the length of its value
40 public:
41  std::size_t data_len;
42  char buf[MAX_VB_SIZE];
43 
45  data_len = MAX_VB_SIZE;
46  }
47 
48  virtual std::size_t to_bytes(char* v) const {
49  memcpy(v, buf, this->data_len);
50  return data_len;
51  };
52 
53  virtual void post_object(const std::function<void(char const* const, std::size_t)>& func) const {
54  func(this->buf, this->data_len);
55  };
56 
57  virtual std::size_t bytes_size() const {
58  return this->data_len;
59  };
60 
62  // do nothing, we don't need DSM.
63  };
64 
65  virtual std::string to_string() {
66  return std::string{buf};
67  };
68 
69  static std::unique_ptr<VariableBytes> from_bytes(DeserializationManager* dsm, char const* const v) {
70  VariableBytes vb;
71  std::unique_ptr<VariableBytes> pvb = std::make_unique<VariableBytes>();
72  strcpy(pvb->buf, v);
73  pvb->data_len = strlen(v) + 1;
74  return pvb;
75  };
76 };
77 
79 // test delta
80 class IntegerWithDelta : public ByteRepresentable, IDeltaSupport<IntegerWithDelta> {
81 public:
82  int value;
83  int delta;
84  IntegerWithDelta(int v) : value(v), delta(0) {}
85  IntegerWithDelta() : value(0), delta(0) {}
86  int add(int op) {
87  this->value += op;
88  this->delta += op;
89  return this->value;
90  }
91  int sub(int op) {
92  this->value -= op;
93  this->delta -= op;
94  return this->value;
95  }
96  virtual void finalizeCurrentDelta(const DeltaFinalizer& dp) {
97  // finalize current delta
98  dp((char const* const) & (this->delta), sizeof(this->delta));
99  // clear delta
100  this->delta = 0;
101  }
102  virtual void applyDelta(char const* const pdat) {
103  // apply delta
104  this->value += *((const int* const)pdat);
105  }
106  static std::unique_ptr<IntegerWithDelta> create(mutils::DeserializationManager* dm) {
107  // create
108  return std::make_unique<IntegerWithDelta>();
109  }
110 
111  virtual const std::string to_string() {
112  return std::to_string(this->value);
113  };
114 
116 };
118 
119 static void
120 printhelp() {
121  cout << "usage:" << endl;
122  cout << "\tgetbyidx <index>" << endl;
123  cout << "\tgetbyver <version>" << endl;
124  cout << "\tgetbytime <timestamp>" << endl;
125  cout << "\tset <value> <version>" << endl;
126  cout << "\ttrimbyidx <index>" << endl;
127  cout << "\ttrimbyver <version>" << endl;
128  cout << "\ttrimbytime <time>" << endl;
129  cout << "\ttruncate <version>" << endl;
130  cout << "\tlist" << endl;
131  cout << "\tvolatile" << endl;
132  cout << "\thlc" << endl;
133  cout << "\tnologsave <int-value>" << endl;
134  cout << "\tnologload" << endl;
135  cout << "\teval <file|mem> <datasize> <num> [batch]" << endl;
136  cout << "\tlogtail-set <value> <version>" << endl;
137  cout << "\tlogtail-list" << endl;
138  cout << "\tlogtail-serialize [since-ver]" << endl;
139  cout << "\tlogtail-trim <version>" << endl;
140  cout << "\tlogtail-apply" << endl;
141  cout << "\tdelta-list" << endl;
142  cout << "\tdelta-add <op> <version>" << endl;
143  cout << "\tdelta-sub <op> <version>" << endl;
144  cout << "\tdelta-getbyidx <index>" << endl;
145  cout << "\tdelta-getbyver <version>" << endl;
146  cout << "NOTICE: test can crash if <datasize> is too large(>8MB).\n"
147  << "This is probably due to the stack size is limited. Try \n"
148  << " \"ulimit -s unlimited\"\n"
149  << "to remove this limitation." << endl;
150 }
151 
152 Persistent<X> px1([]() { return std::make_unique<X>(); }, nullptr, &pr);
153 //Persistent<X> px1;
154 Persistent<VariableBytes> npx([]() { return std::make_unique<VariableBytes>(); }, nullptr, &pr),
155  npx_logtail([]() { return std::make_unique<VariableBytes>(); });
156 //Persistent<X,ST_MEM> px2;
157 Volatile<X> px2([]() { return std::make_unique<X>(); });
158 Persistent<IntegerWithDelta> dx([]() { return std::make_unique<IntegerWithDelta>(); }, nullptr, &pr);
159 
160 template <typename OT, StorageType st = ST_FILE>
162  int64_t nv = var.getNumOfVersions();
163  int64_t idx = var.getEarliestIndex();
164  cout << "Number of Versions:\t" << nv << endl;
165  while(nv--) {
166  /*
167  // by lambda
168  var.getByIndex(nv,
169  [&](OT& x) {
170  cout<<"["<<nv<<"]\t"<<x.to_string()<<"\t//by lambda"<<endl;
171  });
172 */
173  // by copy
174  cout << "[" << idx << "]\t" << var.getByIndex(idx)->to_string() << "\t//by copy" << endl;
175  idx++;
176  }
177  // list minimum latest persisted version:
178  cout << "list minimum latest persisted version:" << getMinimumLatestPersistedVersion(typeid(ReplicatedT), 123, 321) << endl;
179 }
180 
181 static void nologsave(int value) {
182  saveObject(value);
183  saveObject<int, ST_MEM>(value);
184 }
185 
186 static void nologload() {
187  cout << "in file:" << *loadObject<int>() << endl;
188  cout << "in memory:" << *loadObject<int, ST_MEM>() << endl;
189 }
190 
191 static void test_hlc();
192 template <StorageType st = ST_FILE>
193 static void eval_write(std::size_t osize, int nops, bool batch) {
194  VariableBytes writeMe;
195  Persistent<VariableBytes, st> pvar([]() { return std::make_unique<VariableBytes>(); });
196  writeMe.data_len = osize;
197  struct timespec ts, te;
198  int cnt = nops;
199  int64_t ver = pvar.getLatestVersion();
200  ver = (ver == INVALID_VERSION) ? 0 : ver + 1;
201  clock_gettime(CLOCK_REALTIME, &ts);
202  while(cnt-- > 0) {
203  pvar.set(writeMe, ver++);
204  if(!batch) pvar.persist();
205  }
206  if(batch) {
207  pvar.persist();
208  }
209 
210 #if defined(_PERFORMANCE_DEBUG) || defined(_DEBUG)
211  pvar.print_performance_stat();
212 #endif //_PERFORMANCE_DEBUG
213 
214  clock_gettime(CLOCK_REALTIME, &te);
215  long sec = (te.tv_sec - ts.tv_sec);
216  long nsec = sec * 1000000000 + te.tv_nsec - ts.tv_nsec;
217  dbg_default_warn("nanosecond={}\n", nsec);
218  double thp_MBPS = (double)osize * nops / (double)nsec * 1000;
219  double lat_us = (double)nsec / nops / 1000;
220  cout << "WRITE TEST(st=" << st << ", size=" << osize << " byte, ops=" << nops << ")" << endl;
221  cout << "throughput:\t" << thp_MBPS << " MB/s" << endl;
222  cout << "latency:\t" << lat_us << " microseconds" << endl;
223 }
224 
225 int main(int argc, char** argv) {
226  spdlog::set_level(spdlog::level::trace);
227 
228  signal(SIGSEGV, sig_handler);
229 
230  if(argc < 2) {
231  printhelp();
232  return 0;
233  }
234 
235  std::cout << "command:" << argv[1] << std::endl;
236 
237  try {
238  if(strcmp(argv[1], "list") == 0) {
239  cout << "Persistent<VariableBytes> npx:" << endl;
240  listvar<VariableBytes>(npx);
241  //cout<<"Persistent<X,ST_MEM> px2:"<<endl;
242  //listvar<X,ST_MEM>(px2);
243  } else if(strcmp(argv[1], "logtail-list") == 0) {
244  cout << "Persistent<VariableBytes> npx:" << endl;
245  listvar<VariableBytes>(npx_logtail);
246  } else if(strcmp(argv[1], "getbyidx") == 0) {
247  int64_t nv = atol(argv[2]);
248  // by lambda
249  /*
250  npx.getByIndex(nv,
251  [&](VariableBytes& x) {
252  cout<<"["<<nv<<"]\t"<<x.to_string()<<"\t//by lambda"<<endl;
253  });
254 */
255  // by copy
256  cout << "[" << nv << "]\t" << npx.getByIndex(nv)->to_string() << "\t//by copy" << endl;
257  } else if(strcmp(argv[1], "getbyver") == 0) {
258  int64_t ver = atoi(argv[2]);
259  /*
260  // by lambda
261  npx.get(ver,
262  [&](VariableBytes& x) {
263  cout<<"["<<(uint64_t)(ver>>64)<<"."<<(uint64_t)ver<<"]\t"<<x.to_string()<<"\t//by lambda"<<endl;
264  });
265 */
266  // by copy
267  cout << "[" << ver << "]\t" << npx.get(ver)->to_string() << "\t//by copy" << endl;
268  } else if(strcmp(argv[1], "getbytime") == 0) {
269  HLC hlc;
270  hlc.m_rtc_us = atol(argv[2]);
271  hlc.m_logic = 0;
272  /*
273  npx.get(hlc,
274  [&](VariableBytes& x) {
275  cout<<"[("<<hlc.m_rtc_us<<",0)]\t"<<x.to_string()<<"\t//bylambda"<<endl;
276  });
277 */
278  cout << "["
279  << "[(" << hlc.m_rtc_us << ",0)]\t" << npx.get(hlc)->to_string() << "\t//by copy" << endl;
280  } else if(strcmp(argv[1], "trimbyidx") == 0) {
281  int64_t nv = atol(argv[2]);
282  npx.trim(nv);
283  npx.persist();
284  cout << "trim till index " << nv << " successfully" << endl;
285  } else if(strcmp(argv[1], "trimbyver") == 0) {
286  int64_t ver = atol(argv[2]);
287  npx.trim(ver);
288  npx.persist();
289  cout << "trim till ver " << ver << " successfully" << endl;
290  } else if(strcmp(argv[1], "truncate") == 0) {
291  int64_t ver = atol(argv[2]);
292  npx.truncate(ver);
293  cout << "truncated after version" << ver << "successfully" << endl;
294  } else if(strcmp(argv[1], "trimbytime") == 0) {
295  HLC hlc;
296  hlc.m_rtc_us = atol(argv[2]);
297  hlc.m_logic = 0;
298  npx.trim(hlc);
299  npx.persist();
300  cout << "trim till time " << hlc.m_rtc_us << " successfully" << endl;
301  } else if(strcmp(argv[1], "set") == 0) {
302  char* v = argv[2];
303  int64_t ver = (int64_t)atoi(argv[3]);
304  sprintf((*npx).buf, "%s", v);
305  (*npx).data_len = strlen(v) + 1;
306  npx.version(ver);
307  npx.persist();
308  } else if(strcmp(argv[1], "logtail-set") == 0) {
309  char* v = argv[2];
310  int64_t ver = (int64_t)atoi(argv[3]);
311  sprintf((*npx_logtail).buf, "%s", v);
312  (*npx_logtail).data_len = strlen(v) + 1;
313  npx_logtail.version(ver);
314  npx_logtail.persist();
315  }
316 #define LOGTAIL_FILE "logtail.ser"
317  else if(strcmp(argv[1], "logtail-serialize") == 0) {
318  int64_t ver = INVALID_VERSION;
319  if(argc >= 3) {
320  ver = (int64_t)atoi(argv[2]);
321  }
323  ssize_t ds1 = npx_logtail.bytes_size();
324  ssize_t prefix = mutils::bytes_size(npx_logtail.getObjectName()) + mutils::bytes_size(*npx_logtail);
325  char* buf = (char*)malloc(ds1);
326  if(buf == NULL) {
327  cerr << "faile to allocate " << ds1 << " bytes for serialized data. prefix=" << prefix << " bytes" << endl;
328  return -1;
329  }
330  ssize_t ds2 = npx_logtail.to_bytes(buf);
331  cout << "serialization requested " << (ds1 - prefix) << " bytes, used " << (ds2 - prefix) << " bytes" << endl;
332  int fd = open(LOGTAIL_FILE, O_CREAT | O_WRONLY, S_IWUSR | S_IRUSR);
333  if(fd == -1) {
334  cerr << "failed to open file " << LOGTAIL_FILE << endl;
335  return -1;
336  }
337  ssize_t ds3 = write(fd, (void*)((uint64_t)buf + prefix), ds2 - prefix);
338  if(ds3 == -1) {
339  cerr << "failed to write the buffer to file " << LOGTAIL_FILE << endl;
340  free(buf);
341  return -1;
342  }
343  free(buf);
344  close(fd);
345  } else if(strcmp(argv[1], "logtail-trim") == 0) {
346  int64_t ver = atol(argv[2]);
347  npx_logtail.trim(ver);
348  npx_logtail.persist();
349  cout << "logtail-trim till ver " << ver << " successfully" << endl;
350  } else if(strcmp(argv[1], "logtail-apply") == 0) {
351  //load the serialized logtail.
352  int fd = open(LOGTAIL_FILE, O_RDONLY);
353  if(fd == -1) {
354  cerr << "failed to open file " << LOGTAIL_FILE << endl;
355  return -1;
356  }
357 
358  off_t fsize = lseek(fd, 0, SEEK_END);
359  lseek(fd, 0, SEEK_CUR);
360 
361  void* buf = mmap(NULL, (size_t)fsize, PROT_READ, MAP_SHARED, fd, 0);
362  if(buf == MAP_FAILED) {
363  cerr << "failed to map buffer." << endl;
364  return -1;
365  }
366 
367  cout << "before applyLogTail." << endl;
368  npx_logtail.applyLogTail(nullptr, (char*)buf);
369  cout << "after applyLogTail." << endl;
370 
371  munmap(buf, (size_t)fsize);
372  close(fd);
373  } else if(strcmp(argv[1], "volatile") == 0) {
374  cout << "loading Persistent<X,ST_MEM> px2" << endl;
375  listvar<X, ST_MEM>(px2);
376  int64_t ver = (int64_t)0L;
377  X x;
378  x.x = 1;
379  px2.set(x, ver++);
380  px2.persist();
381  cout << "after set 1" << endl;
382  listvar<X, ST_MEM>(px2);
383  x.x = 10;
384  px2.set(x, ver++);
385  px2.persist();
386  cout << "after set 10" << endl;
387  listvar<X, ST_MEM>(px2);
388  x.x = 100;
389  px2.set(x, ver++);
390  px2.persist();
391  cout << "after set 100" << endl;
392  listvar<X, ST_MEM>(px2);
393  } else if(strcmp(argv[1], "hlc") == 0) {
394  test_hlc();
395  } else if(strcmp(argv[1], "nologsave") == 0) {
396  nologsave(atoi(argv[2]));
397  } else if(strcmp(argv[1], "nologload") == 0) {
398  nologload();
399  } else if(strcmp(argv[1], "eval") == 0) {
400  // eval file|mem osize nops
401  int osize = atoi(argv[3]);
402  int nops = atoi(argv[4]);
403  bool batch = false;
404 
405  if(argc >= 6) {
406  batch = (strcmp(argv[5], "batch") == 0);
407  }
408 
409  if(strcmp(argv[2], "file") == 0) {
410  eval_write<ST_FILE>(osize, nops, batch);
411  } else if(strcmp(argv[2], "mem") == 0) {
412  eval_write<ST_MEM>(osize, nops, batch);
413  } else {
414  cout << "unknown storage type:" << argv[2] << endl;
415  }
416  } else if(strcmp(argv[1], "delta-add") == 0) {
417  int op = std::stoi(argv[2]);
418  int64_t ver = (int64_t)atoi(argv[3]);
419  cout << "add(" << op << ") = " << (*dx).add(op) << endl;
420  dx.version(ver);
421  dx.persist();
422  } else if(strcmp(argv[1], "delta-sub") == 0) {
423  int op = std::stoi(argv[2]);
424  int64_t ver = (int64_t)atoi(argv[3]);
425  cout << "sub(" << op << ") = " << (*dx).sub(op) << endl;
426  dx.version(ver);
427  dx.persist();
428  } else if(strcmp(argv[1], "delta-list") == 0) {
429  cout << "Persistent<IntegerWithDelta>:" << endl;
430  listvar<IntegerWithDelta>(dx);
431  } else if(strcmp(argv[1], "delta-getbyidx") == 0) {
432  int64_t index = std::stoi(argv[1]);
433  cout << "dx[idx:" << index << "] = " << dx.getByIndex(index)->value << endl;
434  } else if(strcmp(argv[1], "delta-getbyver") == 0) {
435  int64_t version = std::stoi(argv[1]);
436  cout << "dx[idx:" << version << "] = " << dx[version]->value << endl;
437  } else {
438  cout << "unknown command: " << argv[1] << endl;
439  printhelp();
440  }
441  } catch(unsigned long long exp) {
442  cerr << "Exception captured:0x" << std::hex << exp << endl;
443  return -1;
444  }
445 
446  return 0;
447 }
448 
449 static inline void print_hlc(const char* name, const HLC& hlc) {
450  cout << "HLC\t" << name << "(" << hlc.m_rtc_us << "," << hlc.m_logic << ")" << endl;
451 }
452 
453 void test_hlc() {
454  cout << "creating 2 HLC: h1 and h2." << endl;
455  HLC h1, h2;
456  h1.tick(h2);
457  print_hlc("h1", h1);
458  print_hlc("h2", h2);
459 
460  cout << "\nh1.tick()\t" << endl;
461  h1.tick();
462  print_hlc("h1", h1);
463  print_hlc("h2", h2);
464 
465  cout << "\nh2.tick(h1)\t" << endl;
466  h2.tick(h1);
467  print_hlc("h1", h1);
468  print_hlc("h2", h2);
469 
470  cout << "\ncomparison" << endl;
471  cout << "h1>h2\t" << (h1 > h2) << endl;
472  cout << "h1<h2\t" << (h1 < h2) << endl;
473  cout << "h1>=h2\t" << (h1 >= h2) << endl;
474  cout << "h1<=h2\t" << (h1 <= h2) << endl;
475  cout << "h1==h2\t" << (h1 == h2) << endl;
476 
477  cout << "\nevaluation:h1=h2" << endl;
478  h1 = h2;
479  print_hlc("h1", h1);
480  print_hlc("h2", h2);
481  cout << "h1>h2\t" << (h1 > h2) << endl;
482  cout << "h1<h2\t" << (h1 < h2) << endl;
483  cout << "h1>=h2\t" << (h1 >= h2) << endl;
484  cout << "h1<=h2\t" << (h1 <= h2) << endl;
485  cout << "h1==h2\t" << (h1 == h2) << endl;
486 }
This file include all common types internal to derecho and not necessarily being known by a client pr...
virtual int64_t getEarliestIndex() noexcept(false)
get the earliest index excluding trimmed ones.
virtual void ensure_registered(DeserializationManager &dsm)
Definition: test.cpp:61
virtual void finalizeCurrentDelta(const DeltaFinalizer &dp)
Definition: test.cpp:96
virtual int64_t getNumOfVersions() noexcept(false)
get the number of versions excluding trimmed ones.
void sig_handler(int num)
Definition: test.cpp:19
int argc
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
virtual std::string to_string()
Definition: test.cpp:65
#define DEFAULT_SERIALIZATION_SUPPORT(CLASS_NAME, CLASS_MEMBERS...)
THIS (below) is the only user-facing macro in this file.
std::function< void(char const *const, std::size_t)> DeltaFinalizer
Definition: Persistent.hpp:255
Persistent< X > px1([]() { return std::make_unique< X >();}, nullptr, &pr)
A non-POD type which wishes to mark itself byte representable should extend this class.
auto bytes_size(const T &)
Just calls sizeof(T)
#define MAX_VB_SIZE
Definition: test.cpp:37
IntegerWithDelta()
Definition: test.cpp:85
Persistent< IntegerWithDelta > dx([]() { return std::make_unique< IntegerWithDelta >();}, nullptr, &pr)
void listvar(Persistent< OT, st > &var)
Definition: test.cpp:161
virtual void applyDelta(char const *const pdat)
Definition: test.cpp:102
virtual void post_object(const std::function< void(char const *const, std::size_t)> &func) const
Pass a pointer to a buffer containing this class&#39;s marshalled representation into the function f...
Definition: test.cpp:53
Definition: test.cpp:24
char ** argv
The manager for any RemoteDeserializationContexts.
static std::unique_ptr< IntegerWithDelta > create(mutils::DeserializationManager *dm)
Definition: test.cpp:106
int main(int argc, char **argv)
Definition: test.cpp:5
virtual std::size_t to_bytes(char *v) const
Write this class&#39;s marshalled representation into the array found at v.
Definition: test.cpp:48
static void setEarliestVersionToSerialize(const int64_t &ver) noexcept(true)
Set the earliest version for serialization, exclusive.
Definition: Persistent.cpp:42
#define LOGTAIL_FILE
Definition: Bytes.hpp:5
Volatile< X > px2([]() { return std::make_unique< X >();})
const std::string to_string()
Definition: test.cpp:27
#define INVALID_VERSION
Definition: PersistLog.hpp:28
VariableBytes()
Definition: test.cpp:44
IntegerWithDelta(int v)
Definition: test.cpp:84
int x
Definition: test.cpp:26
Definition: HLC.hpp:7
int sub(int op)
Definition: test.cpp:91
PersistentRegistry pr(nullptr, typeid(ReplicatedT), 123, 321)
Persistent< VariableBytes > npx_logtail([]() { return std::make_unique< VariableBytes >();})
virtual void tick(bool thread_safe=true) noexcept(false)
Definition: HLC.cpp:44
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
virtual std::size_t bytes_size() const
the size of the marshalled representation of this object.
Definition: test.cpp:57
static std::unique_ptr< VariableBytes > from_bytes(DeserializationManager *dsm, char const *const v)
Definition: test.cpp:69
auto getByIndex(int64_t idx, const Func &fun, mutils::DeserializationManager *dm=nullptr) noexcept(false)
get a version of Value T.
std::size_t data_len
Definition: test.cpp:41
int add(int op)
Definition: test.cpp:86
uint64_t m_logic
Definition: HLC.hpp:13
virtual const std::string to_string()
Definition: test.cpp:111
uint64_t m_rtc_us
Definition: HLC.hpp:12
PersistentRegistry is a book for all the Persistent<T> or Volatile<T> variables.
Definition: Persistent.hpp:81
char buf[MAX_VB_SIZE]
Definition: test.cpp:42
#define dbg_default_warn(...)
Definition: logger.hpp:46
Persistent< VariableBytes > npx([]() { return std::make_unique< VariableBytes >();}, nullptr, &pr)