mirror of
https://github.com/hch12907/orbum.git
synced 2024-06-01 19:08:05 -04:00
191 lines
4.8 KiB
C++
191 lines
4.8 KiB
C++
#pragma once
|
|
|
|
#include <functional>
|
|
#include <mutex>
|
|
#include <stdexcept>
|
|
#include <vector>
|
|
#include <condition_variable>
|
|
|
|
#include <boost/lockfree/spsc_queue.hpp>
|
|
|
|
#include <cereal/cereal.hpp>
|
|
|
|
// TODO: the Mpsc/Spmc/Spsc classes are just Mpmc aliases...
|
|
// In order to support the "events" a mutex is needed
|
|
// anyway, even for simple cases like the Spsc queue.
|
|
|
|
/// MPMC blocking/try queue.
|
|
/// Thread safe for all producers and consumers.
|
|
template <typename ItemTy, size_t capacity>
|
|
class MpmcQueue
|
|
{
|
|
public:
|
|
using QueueTy = boost::lockfree::spsc_queue<ItemTy, boost::lockfree::capacity<capacity>>;
|
|
using SizeTy = typename QueueTy::size_type;
|
|
|
|
static constexpr std::chrono::nanoseconds ZERO_TIMEOUT = std::chrono::nanoseconds(0);
|
|
|
|
bool has_read_available(const SizeTy n_items = 1) const
|
|
{
|
|
return queue.read_available() >= n_items;
|
|
}
|
|
|
|
bool has_write_available(const SizeTy n_items = 1) const
|
|
{
|
|
return queue.write_available() >= n_items;
|
|
}
|
|
|
|
bool is_empty() const
|
|
{
|
|
return !has_read_available();
|
|
}
|
|
|
|
bool is_full() const
|
|
{
|
|
return !has_write_available();
|
|
}
|
|
|
|
void wait_for_empty()
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
empty_cv.wait(lock, [this] { return is_empty(); });
|
|
}
|
|
|
|
void wait_for_full()
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
full_cv.wait(lock, [this] { return is_full(); });
|
|
}
|
|
|
|
void pop(ItemTy& item, const std::function<void()>& atomic_fn = [] {})
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
|
|
push_cv.wait(lock, [this] { return !is_empty(); });
|
|
|
|
_pop(item, atomic_fn);
|
|
}
|
|
|
|
bool try_pop(ItemTy& item, const std::chrono::nanoseconds timeout = ZERO_TIMEOUT, const std::function<void()>& atomic_fn = [] {})
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
|
|
if (push_cv.wait_for(lock, timeout, [this] { return !is_empty(); }))
|
|
{
|
|
_pop(item, atomic_fn);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void push(const ItemTy& item, const std::function<void()>& atomic_fn = [] {})
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
|
|
pop_cv.wait(lock, [this] { return !is_full(); });
|
|
|
|
_push(item, atomic_fn);
|
|
}
|
|
|
|
bool try_push(const ItemTy& item, const std::chrono::nanoseconds timeout = ZERO_TIMEOUT, const std::function<void()>& atomic_fn = [] {})
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
|
|
if (pop_cv.wait_for(lock, timeout, [this] { return !is_full(); }))
|
|
{
|
|
_push(item, atomic_fn);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/// Not thread safe.
|
|
void reset()
|
|
{
|
|
queue.reset();
|
|
}
|
|
|
|
private:
|
|
/// Internal pop method used when lock is held.
|
|
void _pop(ItemTy& item, const std::function<void()>& atomic_fn)
|
|
{
|
|
atomic_fn();
|
|
|
|
if (!queue.pop(item))
|
|
throw std::runtime_error("Popping MpmcQueue failed");
|
|
|
|
if (is_empty())
|
|
empty_cv.notify_one();
|
|
|
|
pop_cv.notify_one();
|
|
}
|
|
|
|
/// Internal push method used when lock is held.
|
|
void _push(const ItemTy& item, const std::function<void()>& atomic_fn)
|
|
{
|
|
atomic_fn();
|
|
|
|
if (!queue.push(item))
|
|
throw std::runtime_error("Pushing MpmcQueue failed");
|
|
|
|
if (is_full())
|
|
full_cv.notify_one();
|
|
|
|
push_cv.notify_one();
|
|
}
|
|
|
|
std::mutex mutex;
|
|
std::condition_variable pop_cv;
|
|
std::condition_variable push_cv;
|
|
std::condition_variable empty_cv;
|
|
std::condition_variable full_cv;
|
|
QueueTy queue;
|
|
|
|
public:
|
|
template<class Archive>
|
|
void save(Archive & archive) const
|
|
{
|
|
size_t length = queue.read_available();
|
|
std::vector<ItemTy> vec(length);
|
|
|
|
for (size_t i = 0; i < length; i++)
|
|
vec[i] = *(&queue.front() + i * sizeof(ItemTy));
|
|
|
|
archive(CEREAL_NVP(length));
|
|
archive.saveBinaryValue(vec.data(), vec.size(), "data");
|
|
}
|
|
|
|
template<class Archive>
|
|
void load(Archive & archive)
|
|
{
|
|
queue.reset();
|
|
|
|
size_t length;
|
|
archive(CEREAL_NVP(length));
|
|
|
|
std::vector<ItemTy> vec(length);
|
|
|
|
archive.loadBinaryValue(vec.data(), vec.size(), "data");
|
|
|
|
for (const auto& item : vec)
|
|
queue.push(item);
|
|
}
|
|
};
|
|
|
|
/// MPSC blocking/try queue.
|
|
/// Thread safe for all producers and only one consumer allowed.
|
|
template <typename ItemTy, size_t capacity>
|
|
using MpscQueue = MpmcQueue<ItemTy, capacity>;
|
|
|
|
/// SPMC blocking/try queue.
|
|
/// Thread safe for all consumers and only one producer allowed.
|
|
template <typename ItemTy, size_t capacity>
|
|
using SpmcQueue = MpmcQueue<ItemTy, capacity>;
|
|
|
|
/// SPSC blocking/try queue.
|
|
/// No thread safety - only 1 producer and 1 consumer allowed.
|
|
template <typename ItemTy, size_t capacity>
|
|
using SpscQueue = MpmcQueue<ItemTy, capacity>;
|