Derecho  0.9
Distributed systems toolkit for RDMA
FilePersistLog.cpp
Go to the documentation of this file.
3 #include <derecho/conf/conf.hpp>
4 #include <dirent.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <iostream>
8 #include <string.h>
9 #include <string>
10 #include <sys/mman.h>
11 #include <sys/stat.h>
12 #include <unistd.h>
13 
14 #if __GNUC__ > 7
15 #include <filesystem>
16 namespace fs = std::filesystem;
17 #else
18 #include <experimental/filesystem>
19 namespace fs = std::experimental::filesystem;
20 #endif
21 
22 using namespace std;
23 
24 namespace persistent {
25 
27 // internal structures //
29 
31 // visible to outside //
33 
34 FilePersistLog::FilePersistLog(const string& name, const string& dataPath) noexcept(false) : PersistLog(name),
35  m_sDataPath(dataPath),
36  m_sMetaFile(dataPath + "/" + name + "." + META_FILE_SUFFIX),
37  m_sLogFile(dataPath + "/" + name + "." + LOG_FILE_SUFFIX),
38  m_sDataFile(dataPath + "/" + name + "." + DATA_FILE_SUFFIX),
41  m_iLogFileDesc(-1),
42  m_iDataFileDesc(-1),
43  m_pLog(MAP_FAILED),
44  m_pData(MAP_FAILED) {
45  if(pthread_rwlock_init(&this->m_rwlock, NULL) != 0) {
46  throw PERSIST_EXP_RWLOCK_INIT(errno);
47  }
48  if(pthread_mutex_init(&this->m_perslock, NULL) != 0) {
49  throw PERSIST_EXP_MUTEX_INIT(errno);
50  }
51  dbg_default_trace("{0} constructor: before load()", name);
53  reset();
54  }
55  load();
56  dbg_default_trace("{0} constructor: after load()", name);
57 }
58 
59 void FilePersistLog::reset() noexcept(false) {
60  dbg_default_trace("{0} reset state...begin", this->m_sName);
61  if(fs::exists(this->m_sMetaFile)) {
62  if(!fs::remove(this->m_sMetaFile)) {
63  dbg_default_error("{0} reset failed to remove the file:{1}", this->m_sName, this->m_sMetaFile);
64  throw PERSIST_EXP_REMOVE_FILE(errno);
65  }
66  if(!fs::remove(this->m_sLogFile)) {
67  dbg_default_error("{0} reset failed to remove the file:{1}", this->m_sName, this->m_sLogFile);
68  throw PERSIST_EXP_REMOVE_FILE(errno);
69  }
70  if(!fs::remove(this->m_sDataFile)) {
71  dbg_default_error("{0} reset failed to remove the file:{1}", this->m_sName, this->m_sDataFile);
72  throw PERSIST_EXP_REMOVE_FILE(errno);
73  }
74  }
75  dbg_default_trace("{0} reset state...done", this->m_sName);
76 }
77 
78 void FilePersistLog::load() noexcept(false) {
79  dbg_default_trace("{0}:load state...begin", this->m_sName);
80  // STEP 0: check if data path exists
81  checkOrCreateDir(this->m_sDataPath);
82  dbg_default_trace("{0}:checkOrCreateDir passed.", this->m_sName);
83  // STEP 1: check and create files.
84  bool bCreate = checkOrCreateMetaFile();
85  checkOrCreateLogFile();
86  checkOrCreateDataFile();
87  dbg_default_trace("{0}:checkOrCreateDataFile passed.", this->m_sName);
88  // STEP 2: open files
89  this->m_iLogFileDesc = open(this->m_sLogFile.c_str(), O_RDWR);
90  if(this->m_iLogFileDesc == -1) {
91  throw PERSIST_EXP_OPEN_FILE(errno);
92  }
93  this->m_iDataFileDesc = open(this->m_sDataFile.c_str(), O_RDWR);
94  if(this->m_iDataFileDesc == -1) {
95  throw PERSIST_EXP_OPEN_FILE(errno);
96  }
97  // STEP 3: mmap to memory
101  this->m_pLog = mmap(NULL, MAX_LOG_SIZE << 1, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
102  if(this->m_pLog == MAP_FAILED) {
103  dbg_default_error("{0}:reserve map space for log failed.", this->m_sName);
104  throw PERSIST_EXP_MMAP_FILE(errno);
105  }
106  if(mmap(this->m_pLog, MAX_LOG_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iLogFileDesc, 0) == MAP_FAILED) {
107  dbg_default_error("{0}:map ringbuffer space for the first half of log failed. Is the size of log ringbuffer aligned to page?", this->m_sName);
108  throw PERSIST_EXP_MMAP_FILE(errno);
109  }
110  if(mmap((void*)((uint64_t)this->m_pLog + MAX_LOG_SIZE), MAX_LOG_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iLogFileDesc, 0) == MAP_FAILED) {
111  dbg_default_error("{0}:map ringbuffer space for the second half of log failed. Is the size of log ringbuffer aligned to page?", this->m_sName);
112  throw PERSIST_EXP_MMAP_FILE(errno);
113  }
115  this->m_pData = mmap(NULL, (size_t)(MAX_DATA_SIZE << 1), PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
116  if(this->m_pData == MAP_FAILED) {
117  dbg_default_error("{0}:reserve map space for data failed.", this->m_sName);
118  throw PERSIST_EXP_MMAP_FILE(errno);
119  }
120  if(mmap(this->m_pData, (size_t)(MAX_DATA_SIZE), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iDataFileDesc, 0) == MAP_FAILED) {
121  dbg_default_error("{0}:map ringbuffer space for the first half of data failed. Is the size of data ringbuffer aligned to page?", this->m_sName);
122  throw PERSIST_EXP_MMAP_FILE(errno);
123  }
124  if(mmap((void*)((uint64_t)this->m_pData + MAX_DATA_SIZE), (size_t)MAX_DATA_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iDataFileDesc, 0) == MAP_FAILED) {
125  dbg_default_error("{0}:map ringbuffer space for the second half of data failed. Is the size of data ringbuffer aligned to page?", this->m_sName);
126  throw PERSIST_EXP_MMAP_FILE(errno);
127  }
128  dbg_default_trace("{0}:data/meta file mapped to memory", this->m_sName);
129  // STEP 4: initialize the header for new created Metafile
130  if(bCreate) {
131  META_HEADER->fields.head = 0ll;
132  META_HEADER->fields.tail = 0ll;
133  META_HEADER->fields.ver = INVALID_VERSION;
134  META_HEADER_PERS->fields.head = -1ll; // -1 means uninitialized
135  META_HEADER_PERS->fields.tail = -1ll; // -1 means uninitialized
136  META_HEADER_PERS->fields.ver = INVALID_VERSION;
137  // persist the header
138  FPL_RDLOCK;
140 
141  try {
142  persistMetaHeaderAtomically(META_HEADER);
143  } catch(uint64_t e) {
145  FPL_UNLOCK;
146  throw e;
147  }
149  FPL_UNLOCK;
150  dbg_default_info("{0}:new header initialized.", this->m_sName);
151  } else { // load META_HEADER from disk
152  FPL_WRLOCK;
154  try {
155  int fd = open(this->m_sMetaFile.c_str(), O_RDONLY);
156  if(fd == -1) {
157  throw PERSIST_EXP_OPEN_FILE(errno);
158  }
159  ssize_t nRead = read(fd, (void*)META_HEADER_PERS, sizeof(MetaHeader));
160  if(nRead != sizeof(MetaHeader)) {
161  close(fd);
162  throw PERSIST_EXP_READ_FILE(errno);
163  }
164  close(fd);
166  // update mhlc index
167  for(int64_t idx = META_HEADER->fields.head; idx < META_HEADER->fields.tail; idx++) {
168  struct hlc_index_entry _ent;
169  _ent.hlc.m_rtc_us = LOG_ENTRY_AT(idx)->fields.hlc_r;
170  _ent.hlc.m_logic = LOG_ENTRY_AT(idx)->fields.hlc_l;
171  _ent.log_idx = idx;
172  this->hidx.insert(_ent);
173  }
174  } catch(uint64_t e) {
176  FPL_UNLOCK;
177  throw e;
178  }
179 
181  FPL_UNLOCK;
182  }
183  // STEP 5: update m_hlcLE with the latest event: we don't need this anymore
184  //if (META_HEADER->fields.eno >0) {
185  // if (this->m_hlcLE.m_rtc_us < CURR_LOG_ENTRY->fields.hlc_r &&
186  // this->m_hlcLE.m_logic < CURR_LOG_ENTRY->fields.hlc_l){
187  // this->m_hlcLE.m_rtc_us = CURR_LOG_ENTRY->fields.hlc_r;
188  // this->m_hlcLE.m_logic = CURR_LOG_ENTRY->fields.hlc_l;
189  // }
190  //}
191  dbg_default_trace("{0}:load state...done", this->m_sName);
192 }
193 
194 FilePersistLog::~FilePersistLog() noexcept(true) {
195  pthread_rwlock_destroy(&this->m_rwlock);
196  pthread_mutex_destroy(&this->m_perslock);
197  if(this->m_pData != MAP_FAILED) {
198  munmap(m_pData, (size_t)(MAX_DATA_SIZE << 1));
199  }
200  this->m_pData = nullptr; // prevent ~MemLog() destructor to release it again.
201  if(this->m_pLog != MAP_FAILED) {
202  munmap(m_pLog, MAX_LOG_SIZE);
203  }
204  this->m_pLog = nullptr; // prevent ~MemLog() destructor to release it again.
205  if(this->m_iLogFileDesc != -1) {
206  close(this->m_iLogFileDesc);
207  }
208  if(this->m_iDataFileDesc != -1) {
209  close(this->m_iDataFileDesc);
210  }
211 }
212 
213 void FilePersistLog::append(const void* pdat, const uint64_t& size, const int64_t& ver, const HLC& mhlc) noexcept(false) {
214  dbg_default_trace("{0} append event ({1},{2})", this->m_sName, mhlc.m_rtc_us, mhlc.m_logic);
215  FPL_RDLOCK;
216 
217 #define __DO_VALIDATION \
218  do { \
219  if(NUM_FREE_SLOTS < 1) { \
220  dbg_default_error("{0}-append exception no free slots in log! NUM_FREE_SLOTS={1}", \
221  this->m_sName, NUM_FREE_SLOTS); \
222  dbg_default_flush(); \
223  FPL_UNLOCK; \
224  std::cerr << "PERSIST_EXP_NOSPACE_LOG: FREESLOT=" << NUM_FREE_SLOTS << ",version=" << ver << std::endl; \
225  throw PERSIST_EXP_NOSPACE_LOG; \
226  } \
227  if(NUM_FREE_BYTES < size) { \
228  dbg_default_error("{0}-append exception no space for data: NUM_FREE_BYTES={1}, size={2}", \
229  this->m_sName, NUM_FREE_BYTES, size); \
230  dbg_default_flush(); \
231  FPL_UNLOCK; \
232  std::cerr << "PERSIST_EXP_NOSPACE_DATA: FREE:" << NUM_FREE_BYTES << ",size=" << size << std::endl; \
233  throw PERSIST_EXP_NOSPACE_DATA; \
234  } \
235  if((CURR_LOG_IDX != -1) && (META_HEADER->fields.ver >= ver)) { \
236  int64_t cver = META_HEADER->fields.ver; \
237  dbg_default_error("{0}-append version already exists! cur_ver:{1} new_ver:{2}", this->m_sName, \
238  (int64_t)cver, (int64_t)ver); \
239  dbg_default_flush(); \
240  FPL_UNLOCK; \
241  std::cerr << "PERSIST_EXP_INV_VERSION:cver=" << cver << ",ver=" << ver << std::endl; \
242  throw PERSIST_EXP_INV_VERSION; \
243  } \
244  } while(0)
245 
246 #pragma GCC diagnostic ignored "-Wunused-variable"
248 #pragma GCC diagnostic pop
249  FPL_UNLOCK;
250  dbg_default_trace("{0} append:validate check1 Finished.", this->m_sName);
251 
252  FPL_WRLOCK;
253 //check
254 #pragma GCC diagnostic ignored "-Wunused-variable"
256 #pragma GCC diagnostic pop
257  dbg_default_trace("{0} append:validate check2 Finished.", this->m_sName);
258 
259  // copy data
260  memcpy(NEXT_DATA, pdat, size);
261  dbg_default_trace("{0} append:data is copied to log.", this->m_sName);
262 
263  // fill the log entry
264  NEXT_LOG_ENTRY->fields.ver = ver;
265  NEXT_LOG_ENTRY->fields.dlen = size;
266  NEXT_LOG_ENTRY->fields.ofst = NEXT_DATA_OFST;
267  NEXT_LOG_ENTRY->fields.hlc_r = mhlc.m_rtc_us;
268  NEXT_LOG_ENTRY->fields.hlc_l = mhlc.m_logic;
269  /* No Sync required here.
270  if (msync(ALIGN_TO_PAGE(NEXT_LOG_ENTRY),
271  sizeof(LogEntry) + (((uint64_t)NEXT_LOG_ENTRY) % PAGE_SIZE),MS_SYNC) != 0) {
272  FPL_UNLOCK;
273  throw PERSIST_EXP_MSYNC(errno);
274  }
275 */
276 
277  // update meta header
278  this->hidx.insert(hlc_index_entry{mhlc, META_HEADER->fields.tail});
279  META_HEADER->fields.tail++;
280  META_HEADER->fields.ver = ver;
281  dbg_default_trace("{0} append:log entry and meta data are updated.", this->m_sName);
282  /* No sync
283  if (msync(this->m_pMeta,sizeof(MetaHeader),MS_SYNC) != 0) {
284  FPL_UNLOCK;
285  throw PERSIST_EXP_MSYNC(errno);
286  }
287 */
288  dbg_default_debug("{0} append a log ver:{1} hlc:({2},{3})", this->m_sName,
289  ver, mhlc.m_rtc_us, mhlc.m_logic);
290  FPL_UNLOCK;
291 }
292 
293 void FilePersistLog::advanceVersion(const int64_t& ver) noexcept(false) {
294  FPL_WRLOCK;
295  if(META_HEADER->fields.ver < ver) {
296  META_HEADER->fields.ver = ver;
297  } else {
298  FPL_UNLOCK;
300  }
301  FPL_UNLOCK;
302 }
303 
304 const int64_t FilePersistLog::persist(const bool preLocked) noexcept(false) {
305  int64_t ver_ret = INVALID_VERSION;
306  if(!preLocked) {
308  FPL_RDLOCK;
309  }
310 
311  if(*META_HEADER == *META_HEADER_PERS) {
312  if(CURR_LOG_IDX != -1) {
313  //ver_ret = LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver;
314  ver_ret = META_HEADER->fields.ver;
315  }
316  if(!preLocked) {
317  FPL_UNLOCK;
319  }
320  return ver_ret;
321  }
322 
323  //flush data
324  dbg_default_trace("{0} flush data,log,and meta.", this->m_sName);
325  try {
326  // shadow the current state
327  void *flush_dstart = nullptr, *flush_lstart = nullptr;
328  size_t flush_dlen = 0, flush_llen = 0;
329  MetaHeader shadow_header = *META_HEADER;
331  flush_dlen = (LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ofst + LOG_ENTRY_AT(CURR_LOG_IDX)->fields.dlen - NEXT_LOG_ENTRY_PERS->fields.ofst);
332  // flush data
333  flush_dstart = ALIGN_TO_PAGE(NEXT_DATA_PERS);
334  flush_dlen += ((int64_t)NEXT_DATA_PERS) % PAGE_SIZE;
335  // flush log
336  flush_lstart = ALIGN_TO_PAGE(NEXT_LOG_ENTRY_PERS);
337  flush_llen = ((size_t)NEXT_LOG_ENTRY - (size_t)NEXT_LOG_ENTRY_PERS) + ((int64_t)NEXT_LOG_ENTRY_PERS) % PAGE_SIZE;
338  }
339  if(NUM_USED_SLOTS > 0) {
340  //get the latest flushed version
341  //ver_ret = LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ver;
342  ver_ret = META_HEADER->fields.ver;
343  }
344  if(!preLocked) {
345  FPL_UNLOCK;
346  }
347  if(flush_dlen > 0) {
348  if(msync(flush_dstart, flush_dlen, MS_SYNC) != 0) {
349  throw PERSIST_EXP_MSYNC(errno);
350  }
351  }
352  if(flush_llen > 0) {
353  if(msync(flush_lstart, flush_llen, MS_SYNC) != 0) {
354  throw PERSIST_EXP_MSYNC(errno);
355  }
356  }
357  // flush meta data
358  this->persistMetaHeaderAtomically(&shadow_header);
359  } catch(uint64_t e) {
360  if(!preLocked) {
362  }
363  throw e;
364  }
365  dbg_default_trace("{0} flush data,log,and meta...done.", this->m_sName);
366 
367  if(!preLocked) {
369  }
370  return ver_ret;
371 }
372 
373 int64_t FilePersistLog::getLength() noexcept(false) {
374  FPL_RDLOCK;
375  int64_t len = NUM_USED_SLOTS;
376  FPL_UNLOCK;
377 
378  return len;
379 }
380 
381 int64_t FilePersistLog::getEarliestIndex() noexcept(false) {
382  FPL_RDLOCK;
383  int64_t idx = (NUM_USED_SLOTS == 0) ? INVALID_INDEX : META_HEADER->fields.head;
384  FPL_UNLOCK;
385  return idx;
386 }
387 
388 int64_t FilePersistLog::getLatestIndex() noexcept(false) {
389  FPL_RDLOCK;
390  int64_t idx = CURR_LOG_IDX;
391  FPL_UNLOCK;
392  return idx;
393 }
394 
395 version_t FilePersistLog::getEarliestVersion() noexcept(false) {
396  FPL_RDLOCK;
397  int64_t idx = (NUM_USED_SLOTS == 0) ? INVALID_INDEX : META_HEADER->fields.head;
398  version_t ver = (idx == INVALID_INDEX) ? INVALID_VERSION : (LOG_ENTRY_AT(idx)->fields.ver);
399  FPL_UNLOCK;
400  return ver;
401 }
402 
403 version_t FilePersistLog::getLatestVersion() noexcept(false) {
404  FPL_RDLOCK;
405  int64_t idx = CURR_LOG_IDX;
406  version_t ver = (idx == INVALID_INDEX) ? INVALID_VERSION : (LOG_ENTRY_AT(idx)->fields.ver);
407  FPL_UNLOCK;
408  return ver;
409 }
410 
411 const version_t FilePersistLog::getLastPersisted() noexcept(false) {
412  version_t last_persisted = INVALID_VERSION;
413  ;
415 
416  last_persisted = META_HEADER_PERS->fields.ver;
417 
419  return last_persisted;
420 }
421 
422 int64_t FilePersistLog::getVersionIndex(const version_t& ver) {
423  FPL_RDLOCK;
424 
425  //binary search
426  dbg_default_trace("{0} - begin binary search.", this->m_sName);
427  int64_t l_idx = binarySearch<int64_t>(
428  [&](const LogEntry* ple) {
429  return ple->fields.ver;
430  },
431  ver,
432  META_HEADER->fields.head,
433  META_HEADER->fields.tail);
434  dbg_default_trace("{0} - end binary search.", this->m_sName);
435 
436  FPL_UNLOCK;
437 
438  dbg_default_trace("{0} getVersionIndex({1}) at index {2}", this->m_sName, ver, l_idx);
439 
440  return l_idx;
441 }
442 
443 const void* FilePersistLog::getEntryByIndex(const int64_t& eidx) noexcept(false) {
444  FPL_RDLOCK;
445  dbg_default_trace("{0}-getEntryByIndex-head:{1},tail:{2},eidx:{3}",
446  this->m_sName, META_HEADER->fields.head, META_HEADER->fields.tail, eidx);
447 
448  int64_t ridx = (eidx < 0) ? (META_HEADER->fields.tail + eidx) : eidx;
449 
450  if(META_HEADER->fields.tail <= ridx || ridx < META_HEADER->fields.head) {
451  FPL_UNLOCK;
452  throw PERSIST_EXP_INV_ENTRY_IDX(eidx);
453  }
454  FPL_UNLOCK;
455 
456  dbg_default_trace("{0} getEntryByIndex at idx:{1} ver:{2} time:({3},{4})",
457  this->m_sName,
458  ridx,
459  (int64_t)(LOG_ENTRY_AT(ridx)->fields.ver),
460  (LOG_ENTRY_AT(ridx))->fields.hlc_r,
461  (LOG_ENTRY_AT(ridx))->fields.hlc_l);
462 
463  return LOG_ENTRY_DATA(LOG_ENTRY_AT(ridx));
464 }
465 
478 /*
479  template<typename TKey,typename KeyGetter>
480  int64_t FilePersistLog::binarySearch(const KeyGetter & keyGetter, const TKey & key,
481  const int64_t & logHead, const int64_t & logTail) noexcept(false) {
482  if (logTail <= logHead) {
483  dbg_default_trace("binary Search failed...EMPTY LOG");
484  return (int64_t)-1L;
485  }
486  int64_t head = logHead, tail = logTail - 1;
487  int64_t pivot = 0;
488  while (head <= tail) {
489  pivot = (head + tail)/2;
490  dbg_default_trace("Search range: {0}->[{1},{2}]",pivot,head,tail);
491  const TKey p_key = keyGetter(LOG_ENTRY_AT(pivot));
492  if (p_key == key) {
493  break; // found
494  } else if (p_key < key) {
495  if (pivot + 1 >= logTail) {
496  break; // found - the last element
497  } else if (keyGetter(pivot+1) > key) {
498  break; // found - the next one is greater than key
499  } else { // search right
500  head = pivot + 1;
501  }
502  } else { // search left
503  tail = pivot - 1;
504  if (head > tail) {
505  dbg_default_trace("binary Search failed...Object does not exist.");
506  return (int64_t)-1L;
507  }
508  }
509  }
510  return pivot;
511  }
512 */
513 
514 const void* FilePersistLog::getEntry(const int64_t& ver) noexcept(false) {
515  LogEntry* ple = nullptr;
516 
517  FPL_RDLOCK;
518 
519  //binary search
520  dbg_default_trace("{0} - begin binary search.", this->m_sName);
521  int64_t l_idx = binarySearch<int64_t>(
522  [&](const LogEntry* ple) {
523  return ple->fields.ver;
524  },
525  ver,
526  META_HEADER->fields.head,
527  META_HEADER->fields.tail);
528  ple = (l_idx == -1) ? nullptr : LOG_ENTRY_AT(l_idx);
529  dbg_default_trace("{0} - end binary search.", this->m_sName);
530 
531  FPL_UNLOCK;
532 
533  // no object exists before the requested timestamp.
534  if(ple == nullptr) {
535  return nullptr;
536  }
537 
538  dbg_default_trace("{0} getEntry at ({1},{2})", this->m_sName, ple->fields.hlc_r, ple->fields.hlc_l);
539 
540  return LOG_ENTRY_DATA(ple);
541 }
542 
543 int64_t FilePersistLog::getHLCIndex(const HLC& rhlc) noexcept(false) {
544  FPL_RDLOCK;
545  dbg_default_trace("getHLCIndex for hlc({0},{1})", rhlc.m_rtc_us, rhlc.m_logic);
546  struct hlc_index_entry skey(rhlc, 0);
547  auto key = this->hidx.upper_bound(skey);
548  FPL_UNLOCK;
549 
550  if(key != this->hidx.begin() && this->hidx.size() > 0) {
551  dbg_default_trace("getHLCIndex returns: hlc:({0},{1}),idx:{2}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
552  return key->log_idx;
553  }
554 
555  // no object exists before the requested timestamp.
556 
557  dbg_default_trace("{0} getHLCIndex found no entry at ({1},{2})", this->m_sName, rhlc.m_rtc_us, rhlc.m_logic);
558 
559  return INVALID_INDEX;
560 }
561 
562 const void* FilePersistLog::getEntry(const HLC& rhlc) noexcept(false) {
563  LogEntry* ple = nullptr;
564  // unsigned __int128 key = ((((unsigned __int128)rhlc.m_rtc_us)<<64) | rhlc.m_logic);
565 
566  FPL_RDLOCK;
567 
568  // We do not user binary search any more.
569  // //binary search
570  // int64_t head = META_HEADER->fields.head % MAX_LOG_ENTRY;
571  // int64_t tail = META_HEADER->fields.tail % MAX_LOG_ENTRY;
572  // if (tail < head) tail += MAX_LOG_ENTRY; //because we mapped it twice
573  // dbg_default_trace("{0} - begin binary search.",this->m_sName);
574  // int64_t l_idx = binarySearch<unsigned __int128>(
575  // [&](int64_t idx){
576  // return ((((unsigned __int128)LOG_ENTRY_AT(idx)->fields.hlc_r)<<64) | LOG_ENTRY_AT(idx)->fields.hlc_l);
577  // },
578  // key,head,tail);
579  // dbg_default_trace("{0} - end binary search.",this->m_sName);
580  // ple = (l_idx == -1) ? nullptr : LOG_ENTRY_AT(l_idx);
581  dbg_default_trace("getEntry for hlc({0},{1})", rhlc.m_rtc_us, rhlc.m_logic);
582  struct hlc_index_entry skey(rhlc, 0);
583  auto key = this->hidx.upper_bound(skey);
584  FPL_UNLOCK;
585 
586 #ifndef NDEBUG
587  dbg_default_trace("hidx.size = {}", this->hidx.size());
588  if(key == this->hidx.end())
589  dbg_default_trace("found upper bound = hidx.end()");
590  else
591  dbg_default_trace("found upper bound = hlc({},{}),idx{}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
592 #endif //NDEBUG
593 
594  if(key != this->hidx.begin() && this->hidx.size() > 0) {
595  key--;
596  ple = LOG_ENTRY_AT(key->log_idx);
597  dbg_default_trace("getEntry returns: hlc:({0},{1}),idx:{2}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
598  }
599 
600  // no object exists before the requested timestamp.
601  if(ple == nullptr) {
602  return nullptr;
603  }
604 
605  dbg_default_trace("{0} getEntry at ({1},{2})", this->m_sName, ple->fields.hlc_r, ple->fields.hlc_l);
606 
607  return LOG_ENTRY_DATA(ple);
608 }
609 
610 // trim by index
611 void FilePersistLog::trimByIndex(const int64_t& idx) noexcept(false) {
612  dbg_default_trace("{0} trim at index: {1}", this->m_sName, idx);
613  FPL_RDLOCK;
614  // validate check
615  if(idx < META_HEADER->fields.head || idx >= META_HEADER->fields.tail) {
616  FPL_UNLOCK;
617  return;
618  }
619  FPL_UNLOCK;
620 
622  FPL_WRLOCK;
623  //validate check again
624  if(idx < META_HEADER->fields.head || idx >= META_HEADER->fields.tail) {
625  FPL_UNLOCK;
627  return;
628  }
629  META_HEADER->fields.head = idx + 1;
630  try {
631  persist(true);
632  } catch(uint64_t e) {
633  FPL_UNLOCK;
635  throw e;
636  }
637  //TODO: remove entry from index...this is tricky because HLC
638  // order does not agree with index order.
639  FPL_UNLOCK;
641  // throw PERSIST_EXP_UNIMPLEMENTED;
642  dbg_default_trace("{0} trim at index: {1}...done", this->m_sName, idx);
643 }
644 
645 void FilePersistLog::trim(const int64_t& ver) noexcept(false) {
646  dbg_default_trace("{0} trim at version: {1}", this->m_sName, ver);
647  this->trim<int64_t>(ver,
648  [&](const LogEntry* ple) { return ple->fields.ver; });
649  dbg_default_trace("{0} trim at version: {1}...done", this->m_sName, ver);
650 }
651 
652 void FilePersistLog::trim(const HLC& hlc) noexcept(false) {
653  dbg_default_trace("{0} trim at time: {1}.{2}", this->m_sName, hlc.m_rtc_us, hlc.m_logic);
654  // this->trim<unsigned __int128>(
655  // ((((const unsigned __int128)hlc.m_rtc_us)<<64) | hlc.m_logic),
656  // [&](int64_t idx) {
657  // return ((((const unsigned __int128)LOG_ENTRY_AT(idx)->fields.hlc_r)<<64) |
658  // LOG_ENTRY_AT(idx)->fields.hlc_l);
659  // });
660  //TODO: This is hard because HLC order does not agree with index order.
662  dbg_default_trace("{0} trim at time: {1}.{2}...done", this->m_sName, hlc.m_rtc_us, hlc.m_logic);
663 }
664 
665 void FilePersistLog::persistMetaHeaderAtomically(MetaHeader* pShadowHeader) noexcept(false) {
666  // STEP 1: get file name
667  const string swpFile = this->m_sMetaFile + "." + SWAP_FILE_SUFFIX;
668 
669  // STEP 2: write current meta header to swap file
670  int fd = open(swpFile.c_str(), O_RDWR | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH);
671  if(fd == -1) {
672  throw PERSIST_EXP_OPEN_FILE(errno);
673  }
674  ssize_t nWrite = write(fd, pShadowHeader, sizeof(MetaHeader));
675  if(nWrite != sizeof(MetaHeader)) {
676  throw PERSIST_EXP_WRITE_FILE(errno);
677  }
678  close(fd);
679 
680  // STEP 3: atomically update the meta file
681  if(rename(swpFile.c_str(), this->m_sMetaFile.c_str()) != 0) {
682  throw PERSIST_EXP_RENAME_FILE(errno);
683  }
684 
685  // STEP 4: update the persisted header in memory
686  *META_HEADER_PERS = *pShadowHeader;
687 }
688 
689 int64_t FilePersistLog::getMinimumIndexBeyondVersion(const int64_t& ver) noexcept(false) {
690  int64_t rIndex = INVALID_INDEX;
691 
692  dbg_default_trace("{0}[{1}] - request version {2}", this->m_sName, __func__, ver);
693 
694  if(NUM_USED_SLOTS == 0) {
695  dbg_default_trace("{0}[{1}] - request on an empty log, return INVALID_INDEX.", this->m_sName, __func__);
696  return rIndex;
697  }
698 
699  if(ver == INVALID_VERSION) {
700  dbg_default_trace("{0}[{1}] - request all logs", this->m_sName, __func__);
701  // return the earliest log we have.
702  return META_HEADER->fields.head;
703  }
704 
705  // binary search
706  dbg_default_trace("{0}[{1}] - begin binary search.", this->m_sName, __func__);
707  int64_t l_idx = binarySearch<int64_t>(
708  [&](const LogEntry* ple) {
709  return ple->fields.ver;
710  },
711  ver,
712  META_HEADER->fields.head,
713  META_HEADER->fields.tail);
714 
715  if(l_idx == -1) {
716  // if binary search failed, it means the requested version is earlier
717  // than the earliest available log so we return the earliest log entry
718  // we have.
719  rIndex = META_HEADER->fields.head;
720  dbg_default_trace("{0}[{1}] - binary search failed, return the earliest version {2}", this->m_sName, __func__, ver);
721  } else if((l_idx + 1) == META_HEADER->fields.tail) {
722  // if binary search found the last one, it means ver is in the future return INVALID_INDEX.
723  // use the default rIndex value (INVALID_INDEX)
724  dbg_default_trace("{0}[{1}] - binary search returns the last entry in the log. return INVALID_INDEX.", this->m_sName, __func__);
725  } else {
726  // binary search found some entry earlier than the last one. return l_idx+1:
727  dbg_default_trace("{0}[{1}] - binary search returns an entry earlier than the last one, return ldx+1:{2}", this->m_sName, __func__, l_idx + 1);
728  rIndex = l_idx + 1;
729  }
730 
731  return rIndex;
732 }
733 
734 // format for the logs:
735 // [latest_version(int64_t)][nr_log_entry(int64_t)][log_enty1][log_entry2]...
736 // the log entry is from the earliest to the latest.
737 // two functions for serialization/deserialization for log entries:
738 // 1) size_t byteSizeOfLogEntry(const LogEntry * ple);
739 // 2) size_t writeLogEntryToByteArray(const LogEntry * ple, char * ba);
740 // 3) size_t postLogEntry(const std::function<void (char const *const, std::size_t)> f, const LogEntry *ple);
741 // 4) size_t mergeLogEntryFromByteArray(const char * ba);
742 size_t FilePersistLog::bytes_size(const int64_t& ver) noexcept(false) {
743  size_t bsize = (sizeof(int64_t) + sizeof(int64_t));
744  int64_t idx = this->getMinimumIndexBeyondVersion(ver);
745  if(idx != INVALID_INDEX) {
746  while(idx < META_HEADER->fields.tail) {
747  bsize += byteSizeOfLogEntry(LOG_ENTRY_AT(idx));
748  idx++;
749  }
750  }
751  return bsize;
752 }
753 
754 size_t FilePersistLog::to_bytes(char* buf, const int64_t& ver) noexcept(false) {
755  int64_t idx = this->getMinimumIndexBeyondVersion(ver);
756  size_t ofst = 0;
757  // latest_version
758  int64_t latest_version = this->getLatestVersion();
759  *(int64_t*)(buf + ofst) = latest_version;
760  ofst += sizeof(int64_t);
761  // nr_log_entry
762  *(int64_t*)(buf + ofst) = (idx == INVALID_INDEX) ? 0 : (META_HEADER->fields.tail - idx);
763  ofst += sizeof(int64_t);
764  // log_entries
765  if(idx != INVALID_INDEX) {
766  while(idx < META_HEADER->fields.tail) {
767  ofst += writeLogEntryToByteArray(LOG_ENTRY_AT(idx), buf + ofst);
768  idx++;
769  }
770  }
771  return ofst;
772 }
773 
774 void FilePersistLog::post_object(const std::function<void(char const* const, std::size_t)>& f,
775  const int64_t& ver) noexcept(false) {
776  int64_t idx = this->getMinimumIndexBeyondVersion(ver);
777  // latest_version
778  int64_t latest_version = this->getLatestVersion();
779  f((char*)&latest_version, sizeof(int64_t));
780  // nr_log_entry
781  int64_t nr_log_entry = (idx == INVALID_INDEX) ? 0 : (META_HEADER->fields.tail - idx);
782  f((char*)&nr_log_entry, sizeof(int64_t));
783  // log_entries
784  if(idx != INVALID_INDEX) {
785  while(idx < META_HEADER->fields.tail) {
786  postLogEntry(f, LOG_ENTRY_AT(idx));
787  idx++;
788  }
789  }
790 }
791 
792 void FilePersistLog::applyLogTail(char const* v) noexcept(false) {
793  size_t ofst = 0;
794  // latest_version
795  int64_t latest_version = *(const int64_t*)(v + ofst);
796  ofst += sizeof(int64_t);
797  // nr_log_entry
798  int64_t nr_log_entry = *(const int64_t*)(v + ofst);
799  ofst += sizeof(int64_t);
800  // log_entries
801  while(nr_log_entry--) {
802  ofst += mergeLogEntryFromByteArray(v + ofst);
803  }
804  // update the latest version.
805  META_HEADER->fields.ver = latest_version;
806 }
807 
808 size_t FilePersistLog::byteSizeOfLogEntry(const LogEntry* ple) noexcept(false) {
809  return sizeof(LogEntry) + ple->fields.dlen;
810 }
811 
812 size_t FilePersistLog::writeLogEntryToByteArray(const LogEntry* ple, char* ba) noexcept(false) {
813  size_t nr_written = 0;
814  memcpy(ba, ple, sizeof(LogEntry));
815  nr_written += sizeof(LogEntry);
816  if(ple->fields.dlen > 0) {
817  memcpy((void*)(ba + nr_written), (void*)LOG_ENTRY_DATA(ple), ple->fields.dlen);
818  nr_written += ple->fields.dlen;
819  }
820  return nr_written;
821 }
822 
823 size_t FilePersistLog::postLogEntry(const std::function<void(char const* const, std::size_t)>& f, const LogEntry* ple) noexcept(false) {
824  size_t nr_written = 0;
825  f((const char*)ple, sizeof(LogEntry));
826  nr_written += sizeof(LogEntry);
827  if(ple->fields.dlen > 0) {
828  f((const char*)LOG_ENTRY_DATA(ple), ple->fields.dlen);
829  nr_written += ple->fields.dlen;
830  }
831  return nr_written;
832 }
833 
834 size_t FilePersistLog::mergeLogEntryFromByteArray(const char* ba) noexcept(false) {
835  const LogEntry* cple = (const LogEntry*)ba;
836  // valid check
837  // 0) version grows monotonically.
838  if(cple->fields.ver <= META_HEADER->fields.ver) {
839  dbg_default_trace("{0} skip log entry version {1}, we are at {2}.", __func__, cple->fields.ver, META_HEADER->fields.ver);
840  return cple->fields.dlen + sizeof(LogEntry);
841  }
842  // 1) do we have space to merge it?
843  if(NUM_FREE_SLOTS == 0) {
844  dbg_default_trace("{0} failed to merge log entry, we don't empty log entry.", __func__);
846  }
847  if(NUM_FREE_BYTES < cple->fields.dlen) {
848  dbg_default_trace("{0} failed to merge log entry, we need {1} bytes data space, but we have only {2} bytes.", __func__, cple->fields.dlen, NUM_FREE_BYTES);
850  }
851  // 2) merge it!
852  memcpy(NEXT_DATA, (const void*)(ba + sizeof(LogEntry)), cple->fields.dlen);
853  memcpy(NEXT_LOG_ENTRY, cple, sizeof(LogEntry));
854  NEXT_LOG_ENTRY->fields.ofst = NEXT_DATA_OFST;
855  this->hidx.insert(hlc_index_entry{HLC{cple->fields.hlc_r, cple->fields.hlc_l}, META_HEADER->fields.tail});
856  META_HEADER->fields.tail++;
857  META_HEADER->fields.ver = cple->fields.ver;
858  dbg_default_trace("{0} merge log:log entry and meta data are updated.", __func__);
859  return cple->fields.dlen + sizeof(LogEntry);
860 }
862 // invisible to outside //
864 /* -- moved to util.hpp
865  void checkOrCreateDir(const string & dirPath)
866  noexcept(false) {
867  struct stat sb;
868  if (stat(dirPath.c_str(),&sb) == 0) {
869  if (! S_ISDIR(sb.st_mode)) {
870  throw PERSIST_EXP_INV_PATH;
871  }
872  } else {
873  // create it
874  if (mkdir(dirPath.c_str(),0700) != 0) {
875  throw PERSIST_EXP_CREATE_PATH(errno);
876  }
877  }
878  }
879 
880  bool checkOrCreateFileWithSize(const string & file, uint64_t size)
881  noexcept(false) {
882  bool bCreate = false;
883  struct stat sb;
884  int fd;
885 
886  if (stat(file.c_str(),&sb) == 0) {
887  if(! S_ISREG(sb.st_mode)) {
888  throw PERSIST_EXP_INV_FILE;
889  }
890  } else {
891  // create it
892  bCreate = true;
893  }
894 
895  fd = open(file.c_str(), O_RDWR|O_CREAT,S_IWUSR|S_IRUSR|S_IRGRP|S_IWGRP|S_IROTH);
896  if (fd < 0) {
897  throw PERSIST_EXP_CREATE_FILE(errno);
898  }
899 
900  if (ftruncate(fd,size) != 0) {
901  throw PERSIST_EXP_TRUNCATE_FILE(errno);
902  }
903  close(fd);
904  return bCreate;
905  }
906 */
907 bool FilePersistLog::checkOrCreateMetaFile() noexcept(false) {
908  return checkOrCreateFileWithSize(this->m_sMetaFile, META_SIZE);
909 }
910 
911 bool FilePersistLog::checkOrCreateLogFile() noexcept(false) {
912  return checkOrCreateFileWithSize(this->m_sLogFile, MAX_LOG_SIZE);
913 }
914 
915 bool FilePersistLog::checkOrCreateDataFile() noexcept(false) {
916  return checkOrCreateFileWithSize(this->m_sDataFile, MAX_DATA_SIZE);
917 }
918 
919 void FilePersistLog::truncate(const int64_t& ver) noexcept(false) {
920  dbg_default_trace("{0} truncate at version: {1}.", this->m_sName, ver);
921  FPL_WRLOCK;
922  // STEP 1: search for the log entry
923  // TODO
924  //binary search
925  int64_t head = META_HEADER->fields.head % MAX_LOG_ENTRY;
926  int64_t tail = META_HEADER->fields.tail % MAX_LOG_ENTRY;
927  if(tail < head) tail += MAX_LOG_ENTRY;
928  dbg_default_trace("{0} - begin binary search.", this->m_sName);
929  int64_t l_idx = binarySearch<int64_t>(
930  [&](const LogEntry* ple) {
931  return ple->fields.ver;
932  },
933  ver, head, tail);
934  dbg_default_trace("{0} - end binary search.", this->m_sName);
935  // STEP 2: update META_HEADER
936  if(l_idx == -1) { // not adequate log found. We need to remove all logs.
937  // TODO: this may not be safe in case the log has been trimmed beyond 'ver' !!!
938  META_HEADER->fields.tail = META_HEADER->fields.head;
939  } else {
940  int64_t _idx = (META_HEADER->fields.head + l_idx - head) + ((head > l_idx) ? MAX_LOG_ENTRY : 0);
941  META_HEADER->fields.tail = _idx + 1;
942  }
943  if(META_HEADER->fields.ver > ver)
944  META_HEADER->fields.ver = ver;
945  // STEP 3: update PERSISTENT STATE
947  try {
948  persistMetaHeaderAtomically(META_HEADER);
949  } catch(uint64_t e) {
951  FPL_UNLOCK;
952  throw e;
953  }
955  FPL_UNLOCK;
956  dbg_default_trace("{0} truncate at version: {1}....done", this->m_sName, ver);
957 }
958 
959 const uint64_t FilePersistLog::getMinimumLatestPersistedVersion(const std::string& prefix) {
960  // STEP 1: list all meta files in the path
961  DIR* dir = opendir(getPersFilePath().c_str());
962  if(dir == NULL) {
963  // We cannot open the persistent directory, so just return error.
964  dbg_default_error("{}:{} failed to open the directory. errno={}, err={}.",
965  __FILE__, __func__, errno, strerror(errno));
966  return INVALID_VERSION;
967  }
968  // STEP 2: get through the meta header for the minimum
969  struct dirent* dent;
970  bool found = false;
971  int64_t ver = INVALID_VERSION;
972  while((dent = readdir(dir)) != NULL) {
973  uint32_t name_len = strlen(dent->d_name);
974  if(name_len > prefix.length() && strncmp(prefix.c_str(), dent->d_name, prefix.length()) == 0 && strncmp("." META_FILE_SUFFIX, dent->d_name + name_len - strlen(META_FILE_SUFFIX) - 1, strlen(META_FILE_SUFFIX) + 1) == 0) {
975  MetaHeader mh;
976  char fn[1024];
977  sprintf(fn, "%s/%s", getPersFilePath().c_str(), dent->d_name);
978  int fd = open(fn, O_RDONLY);
979  if(fd < 0) {
980  dbg_default_warn("{}:{} cannot read file:{}, errno={}, err={}.",
981  __FILE__, __func__, errno, strerror(errno));
982  continue;
983  }
984  int nRead = read(fd, (void*)&mh, sizeof(mh));
985  if(nRead != sizeof(mh)) {
986  dbg_default_warn("{}:{} cannot load meta header from file:{}, errno={}, err={}",
987  __FILE__, __func__, errno, strerror(errno));
988  close(fd);
989  continue;
990  }
991  close(fd);
992  if(!found || ver > mh.fields.ver)
993  ver = mh.fields.ver;
994  }
995  }
996  return ver;
997 }
998 } // namespace persistent
#define META_SIZE
#define NEXT_DATA_OFST
#define PERSIST_EXP_WRITE_FILE(x)
#define CURR_LOG_IDX
#define NEXT_DATA_PERS
This file include all common types internal to derecho and not necessarily being known by a client pr...
#define FPL_PERS_LOCK
union persistent::log_entry LogEntry
#define ALIGN_TO_PAGE(x)
#define FPL_RDLOCK
#define CONF_PERS_RESET
Definition: conf.hpp:51
#define PERSIST_EXP_OPEN_FILE(x)
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
STL namespace.
#define LOG_ENTRY_DATA(e)
#define CONF_PERS_MAX_DATA_SIZE
Definition: conf.hpp:53
int64_t log_idx
Definition: PersistLog.hpp:34
#define dbg_default_debug(...)
Definition: logger.hpp:42
#define dbg_default_error(...)
Definition: logger.hpp:48
#define PERSIST_EXP_NOSPACE_LOG
HLC hlc
Definition: PersistLog.hpp:33
auto bytes_size(const T &)
Just calls sizeof(T)
#define FPL_UNLOCK
std::string getPersFilePath()
Definition: util.hpp:36
#define INVALID_INDEX
Definition: PersistLog.hpp:29
#define DATA_FILE_SUFFIX
bool checkOrCreateFileWithSize(const std::string &file, uint64_t size) noexcept(false)
Definition: util.hpp:75
#define FPL_WRLOCK
#define LOG_ENTRY_AT(idx)
#define SWAP_FILE_SUFFIX
#define NEXT_LOG_ENTRY
#define FPL_PERS_UNLOCK
#define NEXT_DATA
#define PERSIST_EXP_REMOVE_FILE(x)
#define NEXT_LOG_ENTRY_PERS
#define NUM_FREE_BYTES
#define PERSIST_EXP_NOSPACE_DATA
#define PERSIST_EXP_MMAP_FILE(x)
#define INVALID_VERSION
Definition: PersistLog.hpp:28
#define META_HEADER
#define PERSIST_EXP_INV_VERSION
#define PERSIST_EXP_RWLOCK_INIT(x)
#define dbg_default_trace(...)
Definition: logger.hpp:40
#define PERSIST_EXP_READ_FILE(x)
Definition: HLC.hpp:7
const bool getConfBoolean(const std::string &key)
Definition: conf.cpp:146
#define META_FILE_SUFFIX
#define MAX_DATA_SIZE
#define NUM_FREE_SLOTS
#define dbg_default_info(...)
Definition: logger.hpp:44
#define LOG_FILE_SUFFIX
#define PERSIST_EXP_UNIMPLEMENTED
#define META_HEADER_PERS
#define CONF_PERS_MAX_LOG_ENTRY
Definition: conf.hpp:52
void checkOrCreateDir(const std::string &dirPath) noexcept(false)
Definition: util.hpp:43
#define PAGE_SIZE
#define PERSIST_EXP_MSYNC(x)
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
#define NUM_USED_SLOTS
#define PERSIST_EXP_INV_ENTRY_IDX(x)
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
#define MAX_LOG_ENTRY
#define PERSIST_EXP_RENAME_FILE(x)
const uint64_t getConfUInt64(const std::string &key)
Definition: conf.cpp:134
#define PERSIST_EXP_MUTEX_INIT(x)
Definition: PersistLog.hpp:32
#define MAX_LOG_SIZE
uint64_t m_logic
Definition: HLC.hpp:13
uint64_t m_rtc_us
Definition: HLC.hpp:12
struct persistent::log_entry::@1 fields
#define __DO_VALIDATION
#define dbg_default_warn(...)
Definition: logger.hpp:46