Derecho  0.9
Distributed systems toolkit for RDMA
FilePersistLog.hpp
Go to the documentation of this file.
1 #ifndef FILE_PERSIST_LOG_HPP
2 #define FILE_PERSIST_LOG_HPP
3 
4 #include "PersistLog.hpp"
5 #include "util.hpp"
7 #include <pthread.h>
8 #include <string>
9 
10 namespace persistent {
11 
12 #define META_FILE_SUFFIX "meta"
13 #define LOG_FILE_SUFFIX "log"
14 #define DATA_FILE_SUFFIX "data"
15 #define SWAP_FILE_SUFFIX "swp"
16 
17 // meta header format
18 typedef union meta_header {
19  struct {
20  int64_t head; // the head index
21  int64_t tail; // the tail index
22  int64_t ver; // the latest version number.
23  // uint64_t d_head; // the data head offset
24  // uint64_t d_tail; // the data tail offset
25  } fields;
26  uint8_t bytes[256];
27  bool operator==(const union meta_header& other) {
28  return (this->fields.head == other.fields.head) && (this->fields.tail == other.fields.tail) && (this->fields.ver == other.fields.ver);
29  };
30 } MetaHeader;
31 
32 // log entry format
33 typedef union log_entry {
34  struct {
35  int64_t ver; // version of the data
36  uint64_t dlen; // length of the data
37  uint64_t ofst; // offset of the data in the memory buffer
38  uint64_t hlc_r; // realtime component of hlc
39  uint64_t hlc_l; // logic component of hlc
40  } fields;
41  uint8_t bytes[64];
42 } LogEntry;
43 
44 // TODO: make this hard-wired number configurable.
45 // Currently, we allow 1M(2^20-1) log entries and
46 // 512GB data size. The max log entry and max size are
47 // both from the configuration file:
48 // CONF_PERS_MAX_LOG_ENTRY - "PERS/max_log_entry"
49 // CONF_PERS_MAX_DATA_SIZE - "PERS/max_data_size"
50 #define MAX_LOG_ENTRY (this->m_iMaxLogEntry)
51 #define MAX_LOG_SIZE (sizeof(LogEntry) * MAX_LOG_ENTRY)
52 #define MAX_DATA_SIZE (this->m_iMaxDataSize)
53 #define META_SIZE (sizeof(MetaHeader))
54 
55 // helpers:
57 #define META_HEADER ((MetaHeader*)(&(this->m_currMetaHeader)))
58 #define META_HEADER_PERS ((MetaHeader*)(&(this->m_persMetaHeader)))
59 #define LOG_ENTRY_ARRAY ((LogEntry*)(this->m_pLog))
60 
61 #define NUM_USED_SLOTS (META_HEADER->fields.tail - META_HEADER->fields.head)
62 // #define NUM_USED_SLOTS_PERS (META_HEADER_PERS->tail - META_HEADER_PERS->head)
63 #define NUM_FREE_SLOTS (MAX_LOG_ENTRY - 1 - NUM_USED_SLOTS)
64 // #define NUM_FREE_SLOTS_PERS (MAX_LOG_ENTRY - 1 - NUM_USERD_SLOTS_PERS)
65 
66 #define LOG_ENTRY_AT(idx) (LOG_ENTRY_ARRAY + (int)((idx) % MAX_LOG_ENTRY))
67 #define NEXT_LOG_ENTRY LOG_ENTRY_AT(META_HEADER->fields.tail)
68 #define NEXT_LOG_ENTRY_PERS LOG_ENTRY_AT( \
69  MAX(META_HEADER_PERS->fields.tail, META_HEADER->fields.head))
70 #define CURR_LOG_IDX ((NUM_USED_SLOTS == 0) ? -1 : META_HEADER->fields.tail - 1)
71 #define LOG_ENTRY_DATA(e) ((void*)((uint8_t*)this->m_pData + (e)->fields.ofst % MAX_DATA_SIZE))
72 
73 #define NEXT_DATA_OFST ((CURR_LOG_IDX == -1) ? 0 : (LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ofst + LOG_ENTRY_AT(CURR_LOG_IDX)->fields.dlen))
74 #define NEXT_DATA ((void*)((uint64_t) this->m_pData + NEXT_DATA_OFST % MAX_DATA_SIZE))
75 #define NEXT_DATA_PERS ((NEXT_LOG_ENTRY > NEXT_LOG_ENTRY_PERS) ? LOG_ENTRY_DATA(NEXT_LOG_ENTRY_PERS) : NULL)
76 
77 #define NUM_USED_BYTES ((NUM_USED_SLOTS == 0) ? 0 : (LOG_ENTRY_AT(CURR_LOG_IDX)->fields.ofst + LOG_ENTRY_AT(CURR_LOG_IDX)->fields.dlen - LOG_ENTRY_AT(META_HEADER->fields.head)->fields.ofst))
78 #define NUM_FREE_BYTES (MAX_DATA_SIZE - NUM_USED_BYTES)
79 
80 #define PAGE_SIZE (getpagesize())
81 #define ALIGN_TO_PAGE(x) ((void*)(((uint64_t)(x)) - ((uint64_t)(x)) % PAGE_SIZE))
82 
83 // declaration for binary search util. see cpp file for comments.
84 template <typename TKey, typename KeyGetter>
85 int64_t binarySearch(const KeyGetter&, const TKey&, const int64_t&, const int64_t&);
86 
87 // FilePersistLog is the default persist Log
88 class FilePersistLog : public PersistLog {
89 protected:
90  // the current meta header
92  // the persisted meta header
94  // path of the data files
95  const std::string m_sDataPath;
96  // full meta file name
97  const std::string m_sMetaFile;
98  // full log file name
99  const std::string m_sLogFile;
100  // full data file name
101  const std::string m_sDataFile;
102  // max number of log entry
103  const uint64_t m_iMaxLogEntry;
104  // max data size
105  const uint64_t m_iMaxDataSize;
106 
107  // the log file descriptor
109  // the data file descriptor
111 
112  // memory mapped Log RingBuffer
113  void* m_pLog;
114  // memory mapped Data RingBuffer
115  void* m_pData;
116  // read/write lock
117  pthread_rwlock_t m_rwlock;
118  // persistent lock
119  pthread_mutex_t m_perslock;
120 
121 // lock macro
122 #define FPL_WRLOCK \
123  do { \
124  if(pthread_rwlock_wrlock(&this->m_rwlock) != 0) { \
125  throw PERSIST_EXP_RWLOCK_WRLOCK(errno); \
126  } \
127  } while(0)
128 
129 #define FPL_RDLOCK \
130  do { \
131  if(pthread_rwlock_rdlock(&this->m_rwlock) != 0) { \
132  throw PERSIST_EXP_RWLOCK_WRLOCK(errno); \
133  } \
134  } while(0)
135 
136 #define FPL_UNLOCK \
137  do { \
138  if(pthread_rwlock_unlock(&this->m_rwlock) != 0) { \
139  throw PERSIST_EXP_RWLOCK_UNLOCK(errno); \
140  } \
141  } while(0)
142 
143 #define FPL_PERS_LOCK \
144  do { \
145  if(pthread_mutex_lock(&this->m_perslock) != 0) { \
146  throw PERSIST_EXP_MUTEX_LOCK(errno); \
147  } \
148  } while(0)
149 
150 #define FPL_PERS_UNLOCK \
151  do { \
152  if(pthread_mutex_unlock(&this->m_perslock) != 0) { \
153  throw PERSIST_EXP_MUTEX_UNLOCK(errno); \
154  } \
155  } while(0)
156 
157  // load the log from files. This method may through exceptions if read from
158  // file failed.
159  virtual void load() noexcept(false);
160 
161  // reset the logs. This will remove the existing persisted data.
162  virtual void reset() noexcept(false);
163 
164  // Persistent the Metadata header, we assume
165  // FPL_PERS_LOCK is acquired.
166  virtual void persistMetaHeaderAtomically(MetaHeader*) noexcept(false);
167 
168 public:
169  //Constructor
170  FilePersistLog(const std::string& name, const std::string& dataPath) noexcept(false);
171  FilePersistLog(const std::string& name) noexcept(false) : FilePersistLog(name, getPersFilePath()){};
172  //Destructor
173  virtual ~FilePersistLog() noexcept(true);
174 
175  //Derived from PersistLog
176  virtual void append(const void* pdata,
177  const uint64_t& size, const int64_t& ver,
178  const HLC& mhlc) noexcept(false);
179  virtual void advanceVersion(const int64_t& ver) noexcept(false);
180  virtual int64_t getLength() noexcept(false);
181  virtual int64_t getEarliestIndex() noexcept(false);
182  virtual int64_t getLatestIndex() noexcept(false);
183  virtual int64_t getVersionIndex(const version_t& ver) noexcept(false);
184  virtual int64_t getHLCIndex(const HLC& hlc) noexcept(false);
185  virtual version_t getEarliestVersion() noexcept(false);
186  virtual version_t getLatestVersion() noexcept(false);
187  virtual const version_t getLastPersisted() noexcept(false);
188  virtual const void* getEntryByIndex(const int64_t& eno) noexcept(false);
189  virtual const void* getEntry(const version_t& ver) noexcept(false);
190  virtual const void* getEntry(const HLC& hlc) noexcept(false);
191  virtual const version_t persist(const bool preLocked = false) noexcept(false);
192  virtual void trimByIndex(const int64_t& eno) noexcept(false);
193  virtual void trim(const version_t& ver) noexcept(false);
194  virtual void trim(const HLC& hlc) noexcept(false);
195  virtual void truncate(const version_t& ver) noexcept(false);
196  virtual size_t bytes_size(const version_t& ver) noexcept(false);
197  virtual size_t to_bytes(char* buf, const version_t& ver) noexcept(false);
198  virtual void post_object(const std::function<void(char const* const, std::size_t)>& f,
199  const version_t& ver) noexcept(false);
200  virtual void applyLogTail(char const* v) noexcept(false);
201 
202  template <typename TKey, typename KeyGetter>
203  void trim(const TKey& key, const KeyGetter& keyGetter) noexcept(false) {
204  int64_t idx;
205  // RDLOCK for validation
206  FPL_RDLOCK;
207  idx = binarySearch<TKey>(keyGetter, key, META_HEADER->fields.head, META_HEADER->fields.tail);
208  if(idx == -1) {
209  FPL_UNLOCK;
210  return;
211  }
212  FPL_UNLOCK;
213  // do binary search again in case some concurrent trim() and
214  // append() happens. TODO: any optimization to avoid the second
215  // search?
216  // WRLOCK for trim
217  FPL_WRLOCK;
218  idx = binarySearch<TKey>(keyGetter, key, META_HEADER->fields.head, META_HEADER->fields.tail);
219  if(idx != -1) {
220  META_HEADER->fields.head = (idx + 1);
222  try {
223  persist(true);
224  } catch(uint64_t e) {
225  FPL_UNLOCK;
227  throw e;
228  }
230  //TODO:remove delete entries from the index. This is tricky because
231  // HLC order and idex order does not agree with each other.
232  // throw PERSIST_EXP_UNIMPLEMENTED;
233  } else {
234  FPL_UNLOCK;
235  return;
236  }
237  FPL_UNLOCK;
238  }
239 
245  static const uint64_t getMinimumLatestPersistedVersion(const std::string& prefix);
246 
247 private:
249  bool checkOrCreateMetaFile() noexcept(false);
250 
252  bool checkOrCreateLogFile() noexcept(false);
253 
255  bool checkOrCreateDataFile() noexcept(false);
256 
264  int64_t getMinimumIndexBeyondVersion(const int64_t& ver) noexcept(false);
271  size_t byteSizeOfLogEntry(const LogEntry* ple) noexcept(false);
278  size_t writeLogEntryToByteArray(const LogEntry* ple, char* ba) noexcept(false);
286  size_t postLogEntry(const std::function<void(char const* const, std::size_t)>& f, const LogEntry* ple) noexcept(false);
293  size_t mergeLogEntryFromByteArray(const char* ba) noexcept(false);
294 
307  template <typename TKey, typename KeyGetter>
308  int64_t binarySearch(const KeyGetter& keyGetter, const TKey& key,
309  const int64_t& logHead, const int64_t& logTail) noexcept(false) {
310  if(logTail <= logHead) {
311  dbg_default_trace("binary Search failed...EMPTY LOG");
312  return (int64_t)-1L;
313  }
314  int64_t head = logHead, tail = logTail - 1;
315  int64_t pivot = 0;
316  while(head <= tail) {
317  pivot = (head + tail) / 2;
318  dbg_default_trace("Search range: {0}->[{1},{2}]", pivot, head, tail);
319  const TKey p_key = keyGetter(LOG_ENTRY_AT(pivot));
320  if(p_key == key) {
321  break; // found
322  } else if(p_key < key) {
323  if(pivot + 1 >= logTail) {
324  break; // found - the last element
325  } else if(keyGetter(LOG_ENTRY_AT(pivot + 1)) > key) {
326  break; // found - the next one is greater than key
327  } else { // search right
328  head = pivot + 1;
329  }
330  } else { // search left
331  tail = pivot - 1;
332  if(head > tail) {
333  dbg_default_trace("binary Search failed...Object does not exist.");
334  return (int64_t)-1L;
335  }
336  }
337  }
338  return pivot;
339  }
340 
341 #ifdef DERECHO_DEBUG
342  //dbg functions
343  void dbgDumpMeta() {
344  dbg_default_trace("m_pData={0},m_pLog={1}", (void*)this->m_pData, (void*)this->m_pLog);
345  dbg_default_trace("MEAT_HEADER:head={0},tail={1}", (int64_t)META_HEADER->fields.head, (int64_t)META_HEADER->fields.tail);
346  dbg_default_trace("MEAT_HEADER_PERS:head={0},tail={1}", (int64_t)META_HEADER_PERS->fields.head, (int64_t)META_HEADER_PERS->fields.tail);
347  dbg_default_trace("NEXT_LOG_ENTRY={0},NEXT_LOG_ENTRY_PERS={1}", (void*)NEXT_LOG_ENTRY, (void*)NEXT_LOG_ENTRY_PERS);
348  }
349 #endif //DERECHO_DEBUG
350 };
351 }
352 
353 #endif //FILE_PERSIST_LOG_HPP
const std::string m_sMetaFile
This file include all common types internal to derecho and not necessarily being known by a client pr...
#define FPL_PERS_LOCK
union persistent::log_entry LogEntry
union persistent::meta_header MetaHeader
#define FPL_RDLOCK
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
STL namespace.
int64_t binarySearch(const KeyGetter &keyGetter, const TKey &key, const int64_t &logHead, const int64_t &logTail) noexcept(false)
binary search through the log, return the maximum index of the entries whose key <= ...
bool operator==(const union meta_header &other)
auto bytes_size(const T &)
Just calls sizeof(T)
#define FPL_UNLOCK
std::string getPersFilePath()
Definition: util.hpp:36
#define FPL_WRLOCK
const std::string m_sDataPath
#define LOG_ENTRY_AT(idx)
#define NEXT_LOG_ENTRY
#define FPL_PERS_UNLOCK
#define NEXT_LOG_ENTRY_PERS
#define META_HEADER
#define dbg_default_trace(...)
Definition: logger.hpp:40
Definition: HLC.hpp:7
int64_t binarySearch(const KeyGetter &, const TKey &, const int64_t &, const int64_t &)
void trim(const TKey &key, const KeyGetter &keyGetter) noexcept(false)
struct persistent::meta_header::@0 fields
#define META_HEADER_PERS
const std::string m_sDataFile
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
const std::string m_sLogFile
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.