Hopefully fixed a race condition causing multiple controllers to run at once (same event).

This commit is contained in:
Marco Satti 2018-01-26 17:16:54 +08:00
parent 90b8b40d4d
commit 17e8d206e5
6 changed files with 394 additions and 77 deletions

View file

@ -1,5 +1,9 @@
#pragma once
#if defined(BUILD_DEBUG)
#include <atomic>
#endif
#include "Controller/ControllerEvent.hpp"
class Core;
@ -20,6 +24,11 @@ public:
virtual void handle_event(const ControllerEvent & e) const = 0;
void handle_event_marshall_(const ControllerEvent & e)
{
handle_event(e);
}
protected:
Core * core;
};

View file

@ -1,5 +1,6 @@
#include <sstream>
#include <algorithm>
#include <atomic>
#include <boost/format.hpp>
#include "Core.hpp"
@ -13,6 +14,10 @@
#include "Resources/Ee/Dmac/EeDmacConstants.hpp"
#include "Resources/Ee/Intc/EeIntcConstants.hpp"
#if defined(BUILD_DEBUG)
std::atomic_bool DEBUG_IN_CONTROLLER_EECORE = false;
#endif
CEeCoreInterpreter::CEeCoreInterpreter(Core * core) :
CController(core),
c_vu_interpreter(core)
@ -30,6 +35,12 @@ CEeCoreInterpreter::~CEeCoreInterpreter()
void CEeCoreInterpreter::handle_event(const ControllerEvent & event) const
{
#if defined(BUILD_DEBUG)
if (DEBUG_IN_CONTROLLER_EECORE)
throw std::runtime_error("EeCore controller is already running!");
DEBUG_IN_CONTROLLER_EECORE = true;
#endif
switch (event.type)
{
case ControllerEvent::Type::Time:
@ -44,6 +55,10 @@ void CEeCoreInterpreter::handle_event(const ControllerEvent & event) const
throw std::runtime_error("CEeCoreInterpreter event handler not implemented - please fix!");
}
}
#if defined(BUILD_DEBUG)
DEBUG_IN_CONTROLLER_EECORE = false;
#endif
}
int CEeCoreInterpreter::time_to_ticks(const double time_us) const

View file

@ -1,6 +1,7 @@
#include <utility>
#include <algorithm>
#include <sstream>
#include <atomic>
#include <boost/format.hpp>
#include "Core.hpp"
@ -13,6 +14,10 @@
#include "Resources/Iop/Dmac/IopDmacConstants.hpp"
#include "Resources/Iop/Intc/IopIntcConstants.hpp"
#if defined(BUILD_DEBUG)
std::atomic_bool DEBUG_IN_CONTROLLER_IOPCORE = false;
#endif
CIopCoreInterpreter::CIopCoreInterpreter(Core * core) :
CController(core)
{
@ -29,6 +34,12 @@ CIopCoreInterpreter::~CIopCoreInterpreter()
void CIopCoreInterpreter::handle_event(const ControllerEvent & event) const
{
#if defined(BUILD_DEBUG)
if (DEBUG_IN_CONTROLLER_IOPCORE)
throw std::runtime_error("IopCore controller is already running!");
DEBUG_IN_CONTROLLER_IOPCORE = true;
#endif
switch (event.type)
{
case ControllerEvent::Type::Time:
@ -43,6 +54,10 @@ void CIopCoreInterpreter::handle_event(const ControllerEvent & event) const
throw std::runtime_error("CIopCoreInterpreter event handler not implemented - please fix!");
}
}
#if defined(BUILD_DEBUG)
DEBUG_IN_CONTROLLER_IOPCORE = false;
#endif
}
int CIopCoreInterpreter::time_to_ticks(const double time_us) const

View file

@ -6,6 +6,7 @@
#include <boost/log/utility/setup/console.hpp>
#include <boost/log/utility/setup/file.hpp>
#include <boost/log/sinks/text_file_backend.hpp>
#include <boost/format.hpp>
#include <chrono>
#include "Core.hpp"
@ -118,6 +119,17 @@ RResources & Core::get_resources() const
void Core::run()
{
#if defined(BUILD_DEBUG)
static double DEBUG_TIME_ELAPSED = 0.0;
static double DEBUG_TIME_LOGGED = 0.0;
if ((DEBUG_TIME_ELAPSED - DEBUG_TIME_LOGGED) > 0.01e6)
{
BOOST_LOG(get_logger()) << boost::format("Emulation time elapsed: %.3f") % (DEBUG_TIME_ELAPSED / 1e6);
DEBUG_TIME_LOGGED = DEBUG_TIME_ELAPSED;
}
DEBUG_TIME_ELAPSED += options.time_slice_per_run_us;
#endif
// Enqueue time events (always done on each run).
auto event = ControllerEvent{ ControllerEvent::Type::Time, options.time_slice_per_run_us };
for (int i = 0; i < static_cast<int>(ControllerType::Type::COUNT); i++) // TODO: find better syntax..
@ -133,14 +145,20 @@ void Core::run()
auto task = [this, entry] ()
{
if (controllers[entry.t])
controllers[entry.t]->handle_event(entry.e);
controllers[entry.t]->handle_event_marshall_(entry.e);
};
task_executor->enqueue_task(task);
}
// Wait for sync (task executor has no more tasks).
// Dispatch all tasks and wait for resynchronisation.
task_executor->dispatch();
task_executor->wait_for_idle();
#if defined(BUILD_DEBUG)
if (!task_executor->task_sync.running_task_queue.is_empty() || task_executor->task_sync.thread_busy_counter.busy_counter)
throw std::runtime_error("Task queue was not empty!");
#endif
}
void Core::enqueue_controller_event(const ControllerType::Type c_type, const ControllerEvent & event)

View file

@ -38,22 +38,20 @@ public:
void wait_for_empty()
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return is_empty(); });
full_cv.wait(empty_guard, [this] { return is_empty(); });
}
void pop(ItemTy & item)
{
std::unique_lock<std::mutex> reading_guard(reading_lock);
if (is_empty())
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
}
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
empty_guard.unlock();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_one();
full_cv.notify_all();
}
/// Tries to pop the front of the queue with a timeout.
@ -61,20 +59,17 @@ public:
bool try_pop(ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> reading_guard(reading_lock);
bool item_available = true;
if (is_empty())
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
item_available = empty_cv.wait_for(empty_guard, timeout, [this] { return !is_empty(); });
}
std::unique_lock<std::mutex> empty_guard(empty_lock);
bool item_available = empty_cv.wait_for(empty_guard, timeout, [this] { return !is_empty(); });
empty_guard.unlock();
if (item_available)
{
atomic_fn();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_one();
full_cv.notify_all();
return true;
}
@ -90,15 +85,13 @@ public:
{
std::unique_lock<std::mutex> writing_guard(writing_lock);
if (is_full())
{
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
}
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
full_guard.unlock();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_one();
empty_cv.notify_all();
}
/// Tries to push to the back of the queue with a timeout.
@ -106,20 +99,17 @@ public:
bool try_push(const ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> writing_guard(writing_lock);
bool space_available = true;
if (is_full())
{
std::unique_lock<std::mutex> full_guard(full_lock);
space_available = full_cv.wait(full_guard, timeout, [this] { return !is_full(); });
}
std::unique_lock<std::mutex> full_guard(full_lock);
bool space_available = full_cv.wait(full_guard, timeout, [this] { return !is_full(); });
full_guard.unlock();
if (space_available)
{
atomic_fn();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_one();
empty_cv.notify_all();
return true;
}
@ -147,6 +137,258 @@ private:
QueueTy queue;
};
/// MPSC blocking/try queue.
/// Thread safe for all producers and only one consumer allowed.
template<typename ItemTy, size_t capacity>
class MpscQueue
{
public:
typedef boost::lockfree::spsc_queue<ItemTy, boost::lockfree::capacity<capacity>> QueueTy;
typedef typename QueueTy::size_type SizeTy;
bool has_read_available(const SizeTy n_items = 1) const
{
return queue.read_available() >= n_items;
}
bool has_write_available(const SizeTy n_items = 1) const
{
return queue.write_available() >= n_items;
}
bool is_empty() const
{
return !has_read_available();
}
bool is_full() const
{
return !has_write_available();
}
void wait_for_empty()
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
full_cv.wait(empty_guard, [this] { return is_empty(); });
}
void pop(ItemTy & item)
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
empty_guard.unlock();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_all();
}
/// Tries to pop the front of the queue with a timeout.
/// Executes the given lambda atomically (while popping the queue).
bool try_pop(ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
bool item_available = empty_cv.wait_for(empty_guard, timeout, [this] { return !is_empty(); });
empty_guard.unlock();
if (item_available)
{
atomic_fn();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_all();
return true;
}
return false;
}
bool try_pop(ItemTy & item)
{
return try_pop(item, std::chrono::nanoseconds(0), [] {});
}
void push(const ItemTy & item)
{
std::unique_lock<std::mutex> writing_guard(writing_lock);
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
full_guard.unlock();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_all();
}
/// Tries to push to the back of the queue with a timeout.
/// Executes the given lambda atomically (while pushing the queue).
bool try_push(const ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> writing_guard(writing_lock);
std::unique_lock<std::mutex> full_guard(full_lock);
bool space_available = full_cv.wait(full_guard, timeout, [this] { return !is_full(); });
full_guard.unlock();
if (space_available)
{
atomic_fn();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_all();
return true;
}
return false;
}
bool try_push(const ItemTy & item)
{
return try_push(item, std::chrono::nanoseconds(0), [] {});
}
/// Not thread safe.
void reset()
{
queue.reset();
}
private:
std::mutex writing_lock;
std::mutex empty_lock;
std::mutex full_lock;
std::condition_variable empty_cv;
std::condition_variable full_cv;
QueueTy queue;
};
/// SPMC blocking/try queue.
/// Thread safe for all consumers and only one producer allowed.
template<typename ItemTy, size_t capacity>
class SpmcQueue
{
public:
typedef boost::lockfree::spsc_queue<ItemTy, boost::lockfree::capacity<capacity>> QueueTy;
typedef typename QueueTy::size_type SizeTy;
bool has_read_available(const SizeTy n_items = 1) const
{
return queue.read_available() >= n_items;
}
bool has_write_available(const SizeTy n_items = 1) const
{
return queue.write_available() >= n_items;
}
bool is_empty() const
{
return !has_read_available();
}
bool is_full() const
{
return !has_write_available();
}
void wait_for_empty()
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
full_cv.wait(empty_guard, [this] { return is_empty(); });
}
void pop(ItemTy & item)
{
std::unique_lock<std::mutex> reading_guard(reading_lock);
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
empty_guard.unlock();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_all();
}
/// Tries to pop the front of the queue with a timeout.
/// Executes the given lambda atomically (while popping the queue).
bool try_pop(ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> reading_guard(reading_lock);
std::unique_lock<std::mutex> empty_guard(empty_lock);
bool item_available = empty_cv.wait_for(empty_guard, timeout, [this] { return !is_empty(); });
empty_guard.unlock();
if (item_available)
{
atomic_fn();
if (!queue.pop(item))
throw std::runtime_error("Popping MpmcQueue failed");
full_cv.notify_all();
return true;
}
return false;
}
bool try_pop(ItemTy & item)
{
return try_pop(item, std::chrono::nanoseconds(0), [] {});
}
void push(const ItemTy & item)
{
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
full_guard.unlock();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_all();
}
/// Tries to push to the back of the queue with a timeout.
/// Executes the given lambda atomically (while pushing the queue).
bool try_push(const ItemTy & item, const std::chrono::nanoseconds timeout, const std::function<void()> & atomic_fn)
{
std::unique_lock<std::mutex> full_guard(full_lock);
bool space_available = full_cv.wait(full_guard, timeout, [this] { return !is_full(); });
full_guard.unlock();
if (space_available)
{
atomic_fn();
if (!queue.push(item))
throw std::runtime_error("Pushing MpmcQueue failed");
empty_cv.notify_all();
return true;
}
return false;
}
bool try_push(const ItemTy & item)
{
return try_push(item, std::chrono::nanoseconds(0), [] {});
}
/// Not thread safe.
void reset()
{
queue.reset();
}
private:
std::mutex reading_lock;
std::mutex empty_lock;
std::mutex full_lock;
std::condition_variable empty_cv;
std::condition_variable full_cv;
QueueTy queue;
};
/// SPSC blocking/try queue.
/// No thread safety - only 1 producer and 1 consumer allowed.
template<typename ItemTy, size_t capacity>
@ -179,48 +421,44 @@ public:
void wait_for_empty()
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return is_empty(); });
full_cv.wait(empty_guard, [this] { return is_empty(); });
}
void pop(ItemTy & item)
{
if (is_empty())
{
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
}
std::unique_lock<std::mutex> empty_guard(empty_lock);
empty_cv.wait(empty_guard, [this] { return !is_empty(); });
empty_guard.unlock();
if (!queue.pop(item))
throw std::runtime_error("Could not pop from MpmcQueue.");
full_cv.notify_one();
full_cv.notify_all();
}
bool try_pop(ItemTy & item)
{
bool result = queue.pop(item);
if (result)
full_cv.notify_one();
full_cv.notify_all();
return result;
}
void push(const ItemTy & item)
{
if (is_full())
{
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
}
std::unique_lock<std::mutex> full_guard(full_lock);
full_cv.wait(full_guard, [this] { return !is_full(); });
full_guard.unlock();
if (!queue.push(item))
throw std::runtime_error("Could not push to MpmcQueue.");
empty_cv.notify_one();
empty_cv.notify_all();
}
bool try_push(const ItemTy & item)
{
bool result = queue.push(item);
if (result)
empty_cv.notify_one();
empty_cv.notify_all();
return result;
}

View file

@ -1,5 +1,6 @@
#pragma once
#include <exception>
#include <stdexcept>
#include <memory>
#include <thread>
@ -8,10 +9,9 @@
#include <mutex>
#include <atomic>
#include <chrono>
#include <string>
#include <Queues.hpp>
typedef MpmcQueue<std::function<void()>, 128> TaskQueue;
struct BusyCounter
{
BusyCounter() : busy_counter(0) {}
@ -39,17 +39,28 @@ struct BusyCounter
}
private:
#if defined(BUILD_DEBUG)
friend class Core;
#endif
int busy_counter;
std::mutex busy_counter_mtx;
std::condition_variable busy_counter_cv;
};
/// Executor/Thread synchronisation resources.
/// With ex_ptr, we don't care if two threads try to write to it at once -
/// we only care that at least one exception occured, doesn't matter which one.
struct TaskSync
{
TaskSync() : exit(false) {}
TaskQueue task_queue;
BusyCounter busy_count;
SpmcQueue<std::function<void()>, 128> running_task_queue;
MpscQueue<std::function<void()>, 128> pending_task_queue;
BusyCounter thread_busy_counter;
MpscQueue<std::string, 32> thread_error_queue;
bool exit;
};
@ -57,6 +68,8 @@ struct TaskSync
class Worker
{
public:
static constexpr std::chrono::nanoseconds TIMEOUT = std::chrono::nanoseconds(500);
Worker(TaskSync & task_sync) :
local_exit(false),
task_sync(task_sync)
@ -70,38 +83,33 @@ public:
thread.join();
}
void handle_exception_check() const
{
if (ex_ptr)
std::rethrow_exception(ex_ptr);
}
private:
void main_thread_()
{
while (!task_sync.exit && !ex_ptr && !local_exit)
while (!task_sync.exit && !local_exit)
{
std::function<void()> task_fn;
if (task_sync.task_queue.try_pop(task_fn, std::chrono::nanoseconds(500), [this] { task_sync.busy_count++; }))
if (task_sync.running_task_queue.try_pop(task_fn, TIMEOUT, [this] { task_sync.thread_busy_counter++; }))
{
try
{
task_fn();
}
catch (...)
catch (const std::exception & error)
{
ex_ptr = std::current_exception();
// Add exception to global queue for the executor to deal with.
std::string error_str(error.what());
task_sync.thread_error_queue.push(error_str);
}
task_sync.busy_count--;
task_sync.thread_busy_counter--;
}
}
}
bool local_exit; // Kinda not needed but makes the code a bit nicer by not exposing join()
// publicly, instead guaranteeing the thread will exit.
bool local_exit; // Kinda not needed but makes the code a bit nicer by not
// exposing join() publicly, instead guaranteeing the
// thread will exit.
TaskSync & task_sync;
std::exception_ptr ex_ptr;
std::thread thread;
};
@ -122,29 +130,43 @@ public:
void enqueue_task(const std::function<void()> & fn)
{
task_sync.task_queue.push(fn);
task_sync.pending_task_queue.push(fn);
}
void dispatch()
{
while (!task_sync.pending_task_queue.is_empty())
{
std::function<void()> fn;
task_sync.pending_task_queue.pop(fn);
task_sync.running_task_queue.push(fn);
}
}
void wait_for_idle()
{
while (true)
// Wait for empty running task queue.
task_sync.running_task_queue.wait_for_empty();
// Wait for paused worker threads.
task_sync.thread_busy_counter.wait_for_idle();
// Check if any exceptions occured, rethrow them on the current thread.
// TODO: only the first error is thrown for now... Not sure we will
// ever change this.
if (!task_sync.thread_error_queue.is_empty())
{
// Wait for idle task threads.
task_sync.busy_count.wait_for_idle();
// Check if any worker raised an exception.
for (const auto& worker : workers)
worker->handle_exception_check();
// Make sure the task queue is actually empty - we could have caught
// the workers in a state where they are in the process of
// retrieving a task.
if (task_sync.task_queue.is_empty())
break;
std::string error_str;
task_sync.thread_error_queue.pop(error_str);
throw std::runtime_error(error_str);
}
}
private:
#if defined(BUILD_DEBUG)
friend class Core;
#endif
TaskSync task_sync;
std::vector<std::unique_ptr<Worker>> workers;
};