7 #define VERSION_FUNC_IDX (0) 8 #define PERSIST_FUNC_IDX (1) 9 #define TRIM_FUNC_IDX (2) 10 #define GET_ML_PERSISTED_VER (3) 11 #define TRUNCATE_FUNC_IDX (4) 15 const std::type_index& subgroup_type,
16 uint32_t subgroup_index,
18 _subgroup_prefix(generate_prefix(subgroup_type, subgroup_index, shard_num)),
19 _temporal_query_frontier_provider(tqfp) {
27 callFunc<VERSION_FUNC_IDX>(ver, mhlc);
31 return callFuncMin<PERSIST_FUNC_IDX, int64_t>();
35 callFunc<TRIM_FUNC_IDX>(earliest_version);
39 return callFuncMin<GET_ML_PERSISTED_VER, int64_t>();
43 PersistentRegistry::earliest_version_to_serialize = ver;
55 callFunc<TRUNCATE_FUNC_IDX>(last_version);
65 auto tuple_val = std::make_tuple(vf, pf, tf, lpgf, tcf);
66 std::size_t key = std::hash<std::string>{}(obj_name);
67 auto res = this->
_registry.insert(std::pair<std::size_t, std::tuple<VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc>>(key, tuple_val));
68 if(res.second ==
false) {
71 this->
_registry.insert(std::pair<std::size_t, std::tuple<VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc>>(key, tuple_val));
89 const std::type_index& subgroup_type,
90 uint32_t subgroup_index,
91 uint32_t shard_num) noexcept(
true) {
92 const char* subgroup_type_name = subgroup_type.name();
93 char prefix[strlen(subgroup_type_name) * 2 + 32];
95 for(i = 0; i < strlen(subgroup_type.name()); i++) {
96 sprintf(prefix + 2 * i,
"%02x", subgroup_type_name[i]);
98 sprintf(prefix + 2 * i,
"-%u-%u", subgroup_index, shard_num);
99 return std::string(prefix);
103 const std::string str,
104 const std::type_index& subgroup_type,
105 uint32_t subgroup_index,
106 uint32_t shard_num) noexcept(
true) {
107 std::string prefix =
generate_prefix(subgroup_type, subgroup_index, shard_num);
109 if(prefix == str.substr(0, prefix.length()))
111 }
catch(
const std::out_of_range&) {
This file include all common types internal to derecho and not necessarily being known by a client pr...
void trim(const int64_t &earliest_version) noexcept(false)
Trims the log of all versions earlier than the argument.
const std::string _subgroup_prefix
this appears in the first part of storage file for persistent<T>
static thread_local int64_t earliest_version_to_serialize
Set the earliest version to serialize for recovery.
const char * get_subgroup_prefix()
Get prefix for subgroup, this will appear in the file name of Persistent<T>
std::function< void(const version_t &, const HLC &)> VersionFunc
std::function< const version_t(void)> PersistFunc
void truncate(const int64_t &last_version)
Truncates the log, deleting all versions newer than the provided argument.
const int64_t getMinimumLatestPersistedVersion() noexcept(false)
Returns the minimum of the latest persisted versions among all Persistent fields. ...
static void setEarliestVersionToSerialize(const int64_t &ver) noexcept(true)
Set the earliest version for serialization, exclusive.
static int64_t getEarliestVersionToSerialize() noexcept(true)
Returns the earliest version for serialization.
std::map< std::size_t, std::tuple< VersionFunc, PersistFunc, TrimFunc, LatestPersistedGetterFunc, TruncateFunc > > _registry
Callback registry.
const int64_t persist() noexcept(false)
(attempt to) Persist all existing versions
void makeVersion(const int64_t &ver, const HLC &mhlc) noexcept(false)
Make a new version capturing the current state of the object.
static bool match_prefix(const std::string str, const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num) noexcept(true)
match prefix
std::function< void(const int64_t &)> TruncateFunc
PersistentRegistry(ITemporalQueryFrontierProvider *tqfp, const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num)
std::function< void(const version_t &)> TrimFunc
virtual ~PersistentRegistry()
std::function< const version_t(void)> LatestPersistedGetterFunc
void updateTemporalFrontierProvider(ITemporalQueryFrontierProvider *tqfp)
update temporal query frontier we didn't use a lock on this becuase we assume this is only updated ob...
static void resetEarliestVersionToSerialize() noexcept(true)
Reset the earliest version for serialization to an invalid "uninitialized" state. ...
void registerPersist(const char *obj_name, const VersionFunc &vf, const PersistFunc &pf, const TrimFunc &tf, const LatestPersistedGetterFunc &lpgf, const TruncateFunc &tcf) noexcept(false)
set the latest version for serialization register a Persistent<T> along with its lambda ...
void unregisterPersist(const char *obj_name) noexcept(false)
deregister
static std::string generate_prefix(const std::type_index &subgroup_type, uint32_t subgroup_index, uint32_t shard_num) noexcept(true)
prefix generator prefix format: [hex of subgroup_type]-[subgroup_index]-[shard_num] ...
ITemporalQueryFrontierProvider * _temporal_query_frontier_provider
Pointer to an entity providing TemporalQueryFrontier service.