16 namespace fs = std::filesystem;
18 #include <experimental/filesystem> 19 namespace fs = std::experimental::filesystem;
34 FilePersistLog::FilePersistLog(
const string& name,
const string& dataPath) noexcept(
false) :
PersistLog(name),
35 m_sDataPath(dataPath),
45 if(pthread_rwlock_init(&this->m_rwlock, NULL) != 0) {
48 if(pthread_mutex_init(&this->m_perslock, NULL) != 0) {
59 void FilePersistLog::reset() noexcept(false) {
61 if(fs::exists(this->m_sMetaFile)) {
62 if(!fs::remove(this->m_sMetaFile)) {
63 dbg_default_error(
"{0} reset failed to remove the file:{1}", this->m_sName, this->m_sMetaFile);
66 if(!fs::remove(this->m_sLogFile)) {
67 dbg_default_error(
"{0} reset failed to remove the file:{1}", this->m_sName, this->m_sLogFile);
70 if(!fs::remove(this->m_sDataFile)) {
71 dbg_default_error(
"{0} reset failed to remove the file:{1}", this->m_sName, this->m_sDataFile);
78 void FilePersistLog::load() noexcept(false) {
84 bool bCreate = checkOrCreateMetaFile();
85 checkOrCreateLogFile();
86 checkOrCreateDataFile();
89 this->m_iLogFileDesc = open(this->m_sLogFile.c_str(), O_RDWR);
90 if(this->m_iLogFileDesc == -1) {
93 this->m_iDataFileDesc = open(this->m_sDataFile.c_str(), O_RDWR);
94 if(this->m_iDataFileDesc == -1) {
101 this->m_pLog = mmap(NULL,
MAX_LOG_SIZE << 1, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
102 if(this->m_pLog == MAP_FAILED) {
106 if(mmap(this->m_pLog,
MAX_LOG_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iLogFileDesc, 0) == MAP_FAILED) {
107 dbg_default_error(
"{0}:map ringbuffer space for the first half of log failed. Is the size of log ringbuffer aligned to page?", this->m_sName);
110 if(mmap((
void*)((uint64_t)this->m_pLog +
MAX_LOG_SIZE),
MAX_LOG_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iLogFileDesc, 0) == MAP_FAILED) {
111 dbg_default_error(
"{0}:map ringbuffer space for the second half of log failed. Is the size of log ringbuffer aligned to page?", this->m_sName);
115 this->m_pData = mmap(NULL, (
size_t)(
MAX_DATA_SIZE << 1), PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
116 if(this->m_pData == MAP_FAILED) {
120 if(mmap(this->m_pData, (
size_t)(
MAX_DATA_SIZE), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iDataFileDesc, 0) == MAP_FAILED) {
121 dbg_default_error(
"{0}:map ringbuffer space for the first half of data failed. Is the size of data ringbuffer aligned to page?", this->m_sName);
124 if(mmap((
void*)((uint64_t)this->m_pData +
MAX_DATA_SIZE), (
size_t)
MAX_DATA_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_FIXED, this->m_iDataFileDesc, 0) == MAP_FAILED) {
125 dbg_default_error(
"{0}:map ringbuffer space for the second half of data failed. Is the size of data ringbuffer aligned to page?", this->m_sName);
143 }
catch(uint64_t e) {
155 int fd = open(this->m_sMetaFile.c_str(), O_RDONLY);
167 for(int64_t idx =
META_HEADER->fields.head; idx < META_HEADER->fields.tail; idx++) {
172 this->hidx.insert(_ent);
174 }
catch(uint64_t e) {
194 FilePersistLog::~FilePersistLog() noexcept(true) {
195 pthread_rwlock_destroy(&this->m_rwlock);
196 pthread_mutex_destroy(&this->m_perslock);
197 if(this->m_pData != MAP_FAILED) {
200 this->m_pData =
nullptr;
201 if(this->m_pLog != MAP_FAILED) {
204 this->m_pLog =
nullptr;
205 if(this->m_iLogFileDesc != -1) {
206 close(this->m_iLogFileDesc);
208 if(this->m_iDataFileDesc != -1) {
209 close(this->m_iDataFileDesc);
213 void FilePersistLog::append(
const void* pdat,
const uint64_t& size,
const int64_t& ver,
const HLC& mhlc) noexcept(
false) {
214 dbg_default_trace(
"{0} append event ({1},{2})", this->m_sName, mhlc.m_rtc_us, mhlc.m_logic);
217 #define __DO_VALIDATION \ 219 if(NUM_FREE_SLOTS < 1) { \ 220 dbg_default_error("{0}-append exception no free slots in log! NUM_FREE_SLOTS={1}", \ 221 this->m_sName, NUM_FREE_SLOTS); \ 222 dbg_default_flush(); \ 224 std::cerr << "PERSIST_EXP_NOSPACE_LOG: FREESLOT=" << NUM_FREE_SLOTS << ",version=" << ver << std::endl; \ 225 throw PERSIST_EXP_NOSPACE_LOG; \ 227 if(NUM_FREE_BYTES < size) { \ 228 dbg_default_error("{0}-append exception no space for data: NUM_FREE_BYTES={1}, size={2}", \ 229 this->m_sName, NUM_FREE_BYTES, size); \ 230 dbg_default_flush(); \ 232 std::cerr << "PERSIST_EXP_NOSPACE_DATA: FREE:" << NUM_FREE_BYTES << ",size=" << size << std::endl; \ 233 throw PERSIST_EXP_NOSPACE_DATA; \ 235 if((CURR_LOG_IDX != -1) && (META_HEADER->fields.ver >= ver)) { \ 236 int64_t cver = META_HEADER->fields.ver; \ 237 dbg_default_error("{0}-append version already exists! cur_ver:{1} new_ver:{2}", this->m_sName, \ 238 (int64_t)cver, (int64_t)ver); \ 239 dbg_default_flush(); \ 241 std::cerr << "PERSIST_EXP_INV_VERSION:cver=" << cver << ",ver=" << ver << std::endl; \ 242 throw PERSIST_EXP_INV_VERSION; \ 246 #pragma GCC diagnostic ignored "-Wunused-variable" 248 #pragma GCC diagnostic pop 254 #pragma GCC diagnostic ignored "-Wunused-variable" 256 #pragma GCC diagnostic pop 281 dbg_default_trace(
"{0} append:log entry and meta data are updated.", this->m_sName);
289 ver, mhlc.m_rtc_us, mhlc.m_logic);
293 void FilePersistLog::advanceVersion(
const int64_t& ver) noexcept(
false) {
304 const int64_t FilePersistLog::persist(
const bool preLocked) noexcept(
false) {
327 void *flush_dstart =
nullptr, *flush_lstart =
nullptr;
328 size_t flush_dlen = 0, flush_llen = 0;
348 if(msync(flush_dstart, flush_dlen, MS_SYNC) != 0) {
353 if(msync(flush_lstart, flush_llen, MS_SYNC) != 0) {
358 this->persistMetaHeaderAtomically(&shadow_header);
359 }
catch(uint64_t e) {
373 int64_t FilePersistLog::getLength() noexcept(false) {
381 int64_t FilePersistLog::getEarliestIndex() noexcept(false) {
388 int64_t FilePersistLog::getLatestIndex() noexcept(false) {
395 version_t FilePersistLog::getEarliestVersion() noexcept(false) {
403 version_t FilePersistLog::getLatestVersion() noexcept(false) {
411 const version_t FilePersistLog::getLastPersisted() noexcept(false) {
419 return last_persisted;
422 int64_t FilePersistLog::getVersionIndex(
const version_t& ver) {
427 int64_t l_idx = binarySearch<int64_t>(
429 return ple->fields.ver;
438 dbg_default_trace(
"{0} getVersionIndex({1}) at index {2}", this->m_sName, ver, l_idx);
443 const void* FilePersistLog::getEntryByIndex(
const int64_t& eidx) noexcept(
false) {
448 int64_t ridx = (eidx < 0) ? (
META_HEADER->fields.tail + eidx) : eidx;
450 if(
META_HEADER->fields.tail <= ridx || ridx < META_HEADER->fields.head) {
514 const void* FilePersistLog::getEntry(
const int64_t& ver) noexcept(
false) {
521 int64_t l_idx = binarySearch<int64_t>(
538 dbg_default_trace(
"{0} getEntry at ({1},{2})", this->m_sName, ple->fields.hlc_r, ple->fields.hlc_l);
543 int64_t FilePersistLog::getHLCIndex(
const HLC& rhlc) noexcept(
false) {
547 auto key = this->hidx.upper_bound(skey);
550 if(key != this->hidx.begin() && this->hidx.size() > 0) {
551 dbg_default_trace(
"getHLCIndex returns: hlc:({0},{1}),idx:{2}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
557 dbg_default_trace(
"{0} getHLCIndex found no entry at ({1},{2})", this->m_sName, rhlc.m_rtc_us, rhlc.m_logic);
562 const void* FilePersistLog::getEntry(
const HLC& rhlc) noexcept(
false) {
583 auto key = this->hidx.upper_bound(skey);
588 if(key == this->hidx.end())
591 dbg_default_trace(
"found upper bound = hlc({},{}),idx{}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
594 if(key != this->hidx.begin() && this->hidx.size() > 0) {
597 dbg_default_trace(
"getEntry returns: hlc:({0},{1}),idx:{2}", key->hlc.m_rtc_us, key->hlc.m_logic, key->log_idx);
611 void FilePersistLog::trimByIndex(
const int64_t& idx) noexcept(
false) {
615 if(idx < META_HEADER->fields.head || idx >=
META_HEADER->fields.tail) {
624 if(idx < META_HEADER->fields.head || idx >=
META_HEADER->fields.tail) {
632 }
catch(uint64_t e) {
645 void FilePersistLog::trim(
const int64_t& ver) noexcept(
false) {
647 this->trim<int64_t>(ver,
648 [&](
const LogEntry* ple) {
return ple->fields.ver; });
652 void FilePersistLog::trim(
const HLC& hlc) noexcept(
false) {
653 dbg_default_trace(
"{0} trim at time: {1}.{2}", this->m_sName, hlc.m_rtc_us, hlc.m_logic);
662 dbg_default_trace(
"{0} trim at time: {1}.{2}...done", this->m_sName, hlc.m_rtc_us, hlc.m_logic);
665 void FilePersistLog::persistMetaHeaderAtomically(
MetaHeader* pShadowHeader) noexcept(
false) {
670 int fd = open(swpFile.c_str(), O_RDWR | O_CREAT, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH);
674 ssize_t nWrite = write(fd, pShadowHeader,
sizeof(
MetaHeader));
681 if(rename(swpFile.c_str(), this->m_sMetaFile.c_str()) != 0) {
689 int64_t FilePersistLog::getMinimumIndexBeyondVersion(
const int64_t& ver) noexcept(
false) {
692 dbg_default_trace(
"{0}[{1}] - request version {2}", this->m_sName, __func__, ver);
695 dbg_default_trace(
"{0}[{1}] - request on an empty log, return INVALID_INDEX.", this->m_sName, __func__);
707 int64_t l_idx = binarySearch<int64_t>(
709 return ple->fields.ver;
720 dbg_default_trace(
"{0}[{1}] - binary search failed, return the earliest version {2}", this->m_sName, __func__, ver);
721 }
else if((l_idx + 1) ==
META_HEADER->fields.tail) {
724 dbg_default_trace(
"{0}[{1}] - binary search returns the last entry in the log. return INVALID_INDEX.", this->m_sName, __func__);
727 dbg_default_trace(
"{0}[{1}] - binary search returns an entry earlier than the last one, return ldx+1:{2}", this->m_sName, __func__, l_idx + 1);
743 size_t bsize = (
sizeof(int64_t) +
sizeof(int64_t));
744 int64_t idx = this->getMinimumIndexBeyondVersion(ver);
746 while(idx < META_HEADER->fields.tail) {
755 int64_t idx = this->getMinimumIndexBeyondVersion(ver);
758 int64_t latest_version = this->getLatestVersion();
759 *(int64_t*)(buf + ofst) = latest_version;
760 ofst +=
sizeof(int64_t);
763 ofst +=
sizeof(int64_t);
766 while(idx < META_HEADER->fields.tail) {
767 ofst += writeLogEntryToByteArray(
LOG_ENTRY_AT(idx), buf + ofst);
775 const int64_t& ver) noexcept(
false) {
776 int64_t idx = this->getMinimumIndexBeyondVersion(ver);
778 int64_t latest_version = this->getLatestVersion();
779 f((
char*)&latest_version,
sizeof(int64_t));
782 f((
char*)&nr_log_entry,
sizeof(int64_t));
785 while(idx < META_HEADER->fields.tail) {
792 void FilePersistLog::applyLogTail(
char const* v) noexcept(
false) {
795 int64_t latest_version = *(
const int64_t*)(v + ofst);
796 ofst +=
sizeof(int64_t);
798 int64_t nr_log_entry = *(
const int64_t*)(v + ofst);
799 ofst +=
sizeof(int64_t);
801 while(nr_log_entry--) {
802 ofst += mergeLogEntryFromByteArray(v + ofst);
808 size_t FilePersistLog::byteSizeOfLogEntry(
const LogEntry* ple) noexcept(
false) {
809 return sizeof(
LogEntry) + ple->fields.dlen;
812 size_t FilePersistLog::writeLogEntryToByteArray(
const LogEntry* ple,
char* ba) noexcept(
false) {
813 size_t nr_written = 0;
816 if(ple->fields.dlen > 0) {
817 memcpy((
void*)(ba + nr_written), (
void*)
LOG_ENTRY_DATA(ple), ple->fields.dlen);
818 nr_written += ple->fields.dlen;
823 size_t FilePersistLog::postLogEntry(
const std::function<
void(
char const*
const, std::size_t)>& f,
const LogEntry* ple) noexcept(
false) {
824 size_t nr_written = 0;
825 f((
const char*)ple,
sizeof(
LogEntry));
827 if(ple->fields.dlen > 0) {
829 nr_written += ple->fields.dlen;
834 size_t FilePersistLog::mergeLogEntryFromByteArray(
const char* ba) noexcept(
false) {
844 dbg_default_trace(
"{0} failed to merge log entry, we don't empty log entry.", __func__);
847 if(NUM_FREE_BYTES < cple->fields.dlen) {
858 dbg_default_trace(
"{0} merge log:log entry and meta data are updated.", __func__);
907 bool FilePersistLog::checkOrCreateMetaFile() noexcept(false) {
911 bool FilePersistLog::checkOrCreateLogFile() noexcept(false) {
915 bool FilePersistLog::checkOrCreateDataFile() noexcept(false) {
919 void FilePersistLog::truncate(
const int64_t& ver) noexcept(
false) {
929 int64_t l_idx = binarySearch<int64_t>(
931 return ple->fields.ver;
949 }
catch(uint64_t e) {
965 __FILE__, __func__, errno, strerror(errno));
972 while((dent = readdir(dir)) != NULL) {
973 uint32_t name_len = strlen(dent->d_name);
978 int fd = open(fn, O_RDONLY);
981 __FILE__, __func__, errno, strerror(errno));
984 int nRead = read(fd, (
void*)&mh,
sizeof(mh));
985 if(nRead !=
sizeof(mh)) {
986 dbg_default_warn(
"{}:{} cannot load meta header from file:{}, errno={}, err={}",
987 __FILE__, __func__, errno, strerror(errno));
992 if(!found || ver > mh.fields.ver)
#define PERSIST_EXP_WRITE_FILE(x)
This file include all common types internal to derecho and not necessarily being known by a client pr...
union persistent::log_entry LogEntry
#define PERSIST_EXP_OPEN_FILE(x)
const std::enable_if<(storageType==ST_FILE||storageType==ST_MEM), version_t >::type getMinimumLatestPersistedVersion(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
get the minmum latest persisted version for a Replicated<T> identified by
#define LOG_ENTRY_DATA(e)
#define CONF_PERS_MAX_DATA_SIZE
#define dbg_default_debug(...)
#define dbg_default_error(...)
#define PERSIST_EXP_NOSPACE_LOG
auto bytes_size(const T &)
Just calls sizeof(T)
std::string getPersFilePath()
bool checkOrCreateFileWithSize(const std::string &file, uint64_t size) noexcept(false)
#define LOG_ENTRY_AT(idx)
#define PERSIST_EXP_REMOVE_FILE(x)
#define NEXT_LOG_ENTRY_PERS
#define PERSIST_EXP_NOSPACE_DATA
#define PERSIST_EXP_MMAP_FILE(x)
#define PERSIST_EXP_INV_VERSION
#define PERSIST_EXP_RWLOCK_INIT(x)
#define dbg_default_trace(...)
#define PERSIST_EXP_READ_FILE(x)
const bool getConfBoolean(const std::string &key)
#define dbg_default_info(...)
#define PERSIST_EXP_UNIMPLEMENTED
#define CONF_PERS_MAX_LOG_ENTRY
void checkOrCreateDir(const std::string &dirPath) noexcept(false)
#define PERSIST_EXP_MSYNC(x)
std::enable_if_t< std::is_pod< BR >::value > post_object(const F &f, const BR &br, Args &&... args)
In-place serialization is also sometimes possible.
#define PERSIST_EXP_INV_ENTRY_IDX(x)
std::size_t to_bytes(const ByteRepresentable &b, char *v)
calls b.to_bytes(v) when b is a ByteRepresentable; calls std::memcpy() when b is POD.
#define PERSIST_EXP_RENAME_FILE(x)
const uint64_t getConfUInt64(const std::string &key)
#define PERSIST_EXP_MUTEX_INIT(x)
struct persistent::log_entry::@1 fields
#define dbg_default_warn(...)