7 #include <spdlog/spdlog.h> 20 printf(
"I received signal:%d.\n", num);
28 return std::to_string(x);
37 #define MAX_VB_SIZE (1ull << 30) 49 memcpy(v, buf, this->data_len);
53 virtual void post_object(
const std::function<
void(
char const*
const, std::size_t)>& func)
const {
54 func(this->buf, this->data_len);
58 return this->data_len;
66 return std::string{buf};
71 std::unique_ptr<VariableBytes> pvb = std::make_unique<VariableBytes>();
98 dp((
char const*
const) & (this->delta),
sizeof(this->delta));
104 this->value += *((
const int*
const)pdat);
108 return std::make_unique<IntegerWithDelta>();
112 return std::to_string(this->value);
121 cout <<
"usage:" << endl;
122 cout <<
"\tgetbyidx <index>" << endl;
123 cout <<
"\tgetbyver <version>" << endl;
124 cout <<
"\tgetbytime <timestamp>" << endl;
125 cout <<
"\tset <value> <version>" << endl;
126 cout <<
"\ttrimbyidx <index>" << endl;
127 cout <<
"\ttrimbyver <version>" << endl;
128 cout <<
"\ttrimbytime <time>" << endl;
129 cout <<
"\ttruncate <version>" << endl;
130 cout <<
"\tlist" << endl;
131 cout <<
"\tvolatile" << endl;
132 cout <<
"\thlc" << endl;
133 cout <<
"\tnologsave <int-value>" << endl;
134 cout <<
"\tnologload" << endl;
135 cout <<
"\teval <file|mem> <datasize> <num> [batch]" << endl;
136 cout <<
"\tlogtail-set <value> <version>" << endl;
137 cout <<
"\tlogtail-list" << endl;
138 cout <<
"\tlogtail-serialize [since-ver]" << endl;
139 cout <<
"\tlogtail-trim <version>" << endl;
140 cout <<
"\tlogtail-apply" << endl;
141 cout <<
"\tdelta-list" << endl;
142 cout <<
"\tdelta-add <op> <version>" << endl;
143 cout <<
"\tdelta-sub <op> <version>" << endl;
144 cout <<
"\tdelta-getbyidx <index>" << endl;
145 cout <<
"\tdelta-getbyver <version>" << endl;
146 cout <<
"NOTICE: test can crash if <datasize> is too large(>8MB).\n" 147 <<
"This is probably due to the stack size is limited. Try \n" 148 <<
" \"ulimit -s unlimited\"\n" 149 <<
"to remove this limitation." << endl;
155 npx_logtail([]() {
return std::make_unique<VariableBytes>(); });
160 template <
typename OT, StorageType st = ST_FILE>
164 cout <<
"Number of Versions:\t" << nv << endl;
174 cout <<
"[" << idx <<
"]\t" << var.
getByIndex(idx)->to_string() <<
"\t//by copy" << endl;
181 static void nologsave(
int value) {
183 saveObject<int, ST_MEM>(value);
186 static void nologload() {
187 cout <<
"in file:" << *loadObject<int>() << endl;
188 cout <<
"in memory:" << *loadObject<int, ST_MEM>() << endl;
191 static void test_hlc();
192 template <StorageType st = ST_FILE>
193 static void eval_write(std::size_t osize,
int nops,
bool batch) {
197 struct timespec ts, te;
199 int64_t ver = pvar.getLatestVersion();
201 clock_gettime(CLOCK_REALTIME, &ts);
203 pvar.set(writeMe, ver++);
204 if(!batch) pvar.persist();
210 #if defined(_PERFORMANCE_DEBUG) || defined(_DEBUG) 211 pvar.print_performance_stat();
212 #endif //_PERFORMANCE_DEBUG 214 clock_gettime(CLOCK_REALTIME, &te);
215 long sec = (te.tv_sec - ts.tv_sec);
216 long nsec = sec * 1000000000 + te.tv_nsec - ts.tv_nsec;
218 double thp_MBPS = (double)osize * nops / (
double)nsec * 1000;
219 double lat_us = (double)nsec / nops / 1000;
220 cout <<
"WRITE TEST(st=" << st <<
", size=" << osize <<
" byte, ops=" << nops <<
")" << endl;
221 cout <<
"throughput:\t" << thp_MBPS <<
" MB/s" << endl;
222 cout <<
"latency:\t" << lat_us <<
" microseconds" << endl;
226 spdlog::set_level(spdlog::level::trace);
235 std::cout <<
"command:" << argv[1] << std::endl;
238 if(strcmp(argv[1],
"list") == 0) {
239 cout <<
"Persistent<VariableBytes> npx:" << endl;
240 listvar<VariableBytes>(
npx);
243 }
else if(strcmp(argv[1],
"logtail-list") == 0) {
244 cout <<
"Persistent<VariableBytes> npx:" << endl;
246 }
else if(strcmp(argv[1],
"getbyidx") == 0) {
247 int64_t nv = atol(argv[2]);
256 cout <<
"[" << nv <<
"]\t" <<
npx.getByIndex(nv)->to_string() <<
"\t//by copy" << endl;
257 }
else if(strcmp(argv[1],
"getbyver") == 0) {
258 int64_t ver = atoi(argv[2]);
267 cout <<
"[" << ver <<
"]\t" <<
npx.get(ver)->to_string() <<
"\t//by copy" << endl;
268 }
else if(strcmp(argv[1],
"getbytime") == 0) {
279 <<
"[(" << hlc.
m_rtc_us <<
",0)]\t" <<
npx.get(hlc)->to_string() <<
"\t//by copy" << endl;
280 }
else if(strcmp(argv[1],
"trimbyidx") == 0) {
281 int64_t nv = atol(argv[2]);
284 cout <<
"trim till index " << nv <<
" successfully" << endl;
285 }
else if(strcmp(argv[1],
"trimbyver") == 0) {
286 int64_t ver = atol(argv[2]);
289 cout <<
"trim till ver " << ver <<
" successfully" << endl;
290 }
else if(strcmp(argv[1],
"truncate") == 0) {
291 int64_t ver = atol(argv[2]);
293 cout <<
"truncated after version" << ver <<
"successfully" << endl;
294 }
else if(strcmp(argv[1],
"trimbytime") == 0) {
300 cout <<
"trim till time " << hlc.
m_rtc_us <<
" successfully" << endl;
301 }
else if(strcmp(argv[1],
"set") == 0) {
303 int64_t ver = (int64_t)atoi(argv[3]);
304 sprintf((*npx).buf,
"%s", v);
305 (*npx).data_len = strlen(v) + 1;
308 }
else if(strcmp(argv[1],
"logtail-set") == 0) {
310 int64_t ver = (int64_t)atoi(argv[3]);
311 sprintf((*npx_logtail).buf,
"%s", v);
312 (*npx_logtail).data_len = strlen(v) + 1;
316 #define LOGTAIL_FILE "logtail.ser" 317 else if(strcmp(argv[1],
"logtail-serialize") == 0) {
320 ver = (int64_t)atoi(argv[2]);
325 char* buf = (
char*)malloc(ds1);
327 cerr <<
"faile to allocate " << ds1 <<
" bytes for serialized data. prefix=" << prefix <<
" bytes" << endl;
331 cout <<
"serialization requested " << (ds1 - prefix) <<
" bytes, used " << (ds2 - prefix) <<
" bytes" << endl;
332 int fd = open(
LOGTAIL_FILE, O_CREAT | O_WRONLY, S_IWUSR | S_IRUSR);
337 ssize_t ds3 = write(fd, (
void*)((uint64_t)buf + prefix), ds2 - prefix);
339 cerr <<
"failed to write the buffer to file " <<
LOGTAIL_FILE << endl;
345 }
else if(strcmp(argv[1],
"logtail-trim") == 0) {
346 int64_t ver = atol(argv[2]);
349 cout <<
"logtail-trim till ver " << ver <<
" successfully" << endl;
350 }
else if(strcmp(argv[1],
"logtail-apply") == 0) {
358 off_t fsize = lseek(fd, 0, SEEK_END);
359 lseek(fd, 0, SEEK_CUR);
361 void* buf = mmap(NULL, (
size_t)fsize, PROT_READ, MAP_SHARED, fd, 0);
362 if(buf == MAP_FAILED) {
363 cerr <<
"failed to map buffer." << endl;
367 cout <<
"before applyLogTail." << endl;
369 cout <<
"after applyLogTail." << endl;
371 munmap(buf, (
size_t)fsize);
373 }
else if(strcmp(argv[1],
"volatile") == 0) {
374 cout <<
"loading Persistent<X,ST_MEM> px2" << endl;
375 listvar<X, ST_MEM>(
px2);
376 int64_t ver = (int64_t)0L;
381 cout <<
"after set 1" << endl;
382 listvar<X, ST_MEM>(
px2);
386 cout <<
"after set 10" << endl;
387 listvar<X, ST_MEM>(
px2);
391 cout <<
"after set 100" << endl;
392 listvar<X, ST_MEM>(
px2);
393 }
else if(strcmp(argv[1],
"hlc") == 0) {
395 }
else if(strcmp(argv[1],
"nologsave") == 0) {
396 nologsave(atoi(argv[2]));
397 }
else if(strcmp(argv[1],
"nologload") == 0) {
399 }
else if(strcmp(argv[1],
"eval") == 0) {
401 int osize = atoi(argv[3]);
402 int nops = atoi(argv[4]);
406 batch = (strcmp(argv[5],
"batch") == 0);
409 if(strcmp(argv[2],
"file") == 0) {
410 eval_write<ST_FILE>(osize, nops, batch);
411 }
else if(strcmp(argv[2],
"mem") == 0) {
412 eval_write<ST_MEM>(osize, nops, batch);
414 cout <<
"unknown storage type:" << argv[2] << endl;
416 }
else if(strcmp(argv[1],
"delta-add") == 0) {
417 int op = std::stoi(argv[2]);
418 int64_t ver = (int64_t)atoi(argv[3]);
419 cout <<
"add(" << op <<
") = " << (*dx).add(op) << endl;
422 }
else if(strcmp(argv[1],
"delta-sub") == 0) {
423 int op = std::stoi(argv[2]);
424 int64_t ver = (int64_t)atoi(argv[3]);
425 cout <<
"sub(" << op <<
") = " << (*dx).sub(op) << endl;
428 }
else if(strcmp(argv[1],
"delta-list") == 0) {
429 cout <<
"Persistent<IntegerWithDelta>:" << endl;
430 listvar<IntegerWithDelta>(
dx);
431 }
else if(strcmp(argv[1],
"delta-getbyidx") == 0) {
432 int64_t index = std::stoi(argv[1]);
433 cout <<
"dx[idx:" << index <<
"] = " <<
dx.getByIndex(index)->value << endl;
434 }
else if(strcmp(argv[1],
"delta-getbyver") == 0) {
435 int64_t version = std::stoi(argv[1]);
436 cout <<
"dx[idx:" << version <<
"] = " <<
dx[version]->value << endl;
438 cout <<
"unknown command: " << argv[1] << endl;
441 }
catch(
unsigned long long exp) {
442 cerr <<
"Exception captured:0x" << std::hex << exp << endl;
449 static inline void print_hlc(
const char* name,
const HLC& hlc) {
450 cout <<
"HLC\t" << name <<
"(" << hlc.
m_rtc_us <<
"," << hlc.
m_logic <<
")" << endl;
454 cout <<
"creating 2 HLC: h1 and h2." << endl;
460 cout <<
"\nh1.tick()\t" << endl;
465 cout <<
"\nh2.tick(h1)\t" << endl;
470 cout <<
"\ncomparison" << endl;
471 cout <<
"h1>h2\t" << (h1 > h2) << endl;
472 cout <<
"h1<h2\t" << (h1 < h2) << endl;
473 cout <<
"h1>=h2\t" << (h1 >= h2) << endl;
474 cout <<
"h1<=h2\t" << (h1 <= h2) << endl;
475 cout <<
"h1==h2\t" << (h1 == h2) << endl;
477 cout <<
"\nevaluation:h1=h2" << endl;
481 cout <<
"h1>h2\t" << (h1 > h2) << endl;
482 cout <<
"h1<h2\t" << (h1 < h2) << endl;
483 cout <<
"h1>=h2\t" << (h1 >= h2) << endl;
484 cout <<
"h1<=h2\t" << (h1 <= h2) << endl;
485 cout <<
"h1==h2\t" << (h1 == h2) << endl;
This file include all common types internal to derecho and not necessarily being known by a client pr...
virtual int64_t getEarliestIndex() noexcept(false)
get the earliest index excluding trimmed ones.
virtual void ensure_registered(DeserializationManager &dsm)
virtual void finalizeCurrentDelta(const DeltaFinalizer &dp)
virtual int64_t getNumOfVersions() noexcept(false)
get the number of versions excluding trimmed ones.
void sig_handler(int num)
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
virtual std::string to_string()
#define DEFAULT_SERIALIZATION_SUPPORT(CLASS_NAME, CLASS_MEMBERS...)
THIS (below) is the only user-facing macro in this file.
std::function< void(char const *const, std::size_t)> DeltaFinalizer
Persistent< X > px1([]() { return std::make_unique< X >();}, nullptr, &pr)
A non-POD type which wishes to mark itself byte representable should extend this class.
auto bytes_size(const T &)
Just calls sizeof(T)
Persistent< IntegerWithDelta > dx([]() { return std::make_unique< IntegerWithDelta >();}, nullptr, &pr)
void listvar(Persistent< OT, st > &var)
virtual void applyDelta(char const *const pdat)
virtual void post_object(const std::function< void(char const *const, std::size_t)> &func) const
Pass a pointer to a buffer containing this class's marshalled representation into the function f...
The manager for any RemoteDeserializationContexts.
static std::unique_ptr< IntegerWithDelta > create(mutils::DeserializationManager *dm)
int main(int argc, char **argv)
virtual std::size_t to_bytes(char *v) const
Write this class's marshalled representation into the array found at v.
static void setEarliestVersionToSerialize(const int64_t &ver) noexcept(true)
Set the earliest version for serialization, exclusive.
Volatile< X > px2([]() { return std::make_unique< X >();})
const std::string to_string()
PersistentRegistry pr(nullptr, typeid(ReplicatedT), 123, 321)
Persistent< VariableBytes > npx_logtail([]() { return std::make_unique< VariableBytes >();})
virtual void tick(bool thread_safe=true) noexcept(false)
void saveObject(ObjectType &obj, const char *object_name) noexcept(false)
saveObject() saves a serializable object
virtual std::size_t bytes_size() const
the size of the marshalled representation of this object.
static std::unique_ptr< VariableBytes > from_bytes(DeserializationManager *dsm, char const *const v)
auto getByIndex(int64_t idx, const Func &fun, mutils::DeserializationManager *dm=nullptr) noexcept(false)
get a version of Value T.
virtual const std::string to_string()
PersistentRegistry is a book for all the Persistent<T> or Volatile<T> variables.
#define dbg_default_warn(...)
Persistent< VariableBytes > npx([]() { return std::make_unique< VariableBytes >();}, nullptr, &pr)