1

I want to use a C API in a pipeline of coroutines communicating through channels. This is my first try with coroutines and my knowledge of them is limited.

The shape of the pipeline is:

 --------    1    ---------    2    ------
| source | ----> | process | ----> | sink |
 --------         ---------         ------

Each box represents a coroutine and each arrow a channel.

The C API is used in the process coroutine.

Its signature is roughly: bool start_work(consumer_callback). This API is synchronous and calls consumer_callback once for each data it produces.

I first considered writing to the channel 2 (see diagram above) in the callback, but this would change the signature of the callback so it's not possible.

I changed to pass a coroutine handle to the callback, which resumes it. The resumed coroutine then writes the data to the channel 2.

The simplified code is:

#include <coroutine>
#include <optional>
#include <string>
#include <boost/cobalt/channel.hpp>
#include <boost/cobalt/main.hpp>
#include <boost/cobalt/promise.hpp>
#include <boost/cobalt/join.hpp>

namespace cobalt = boost::cobalt;

// Data to communicate between the callback and the channel writer.
struct Data {
   std::optional<int> result;
   bool done = false;
   std::coroutine_handle<> coro_handle;
};

using Callback = void (*)(int, void*, bool);

void consumer_callback(int i, void* data, bool done) {
   Data& data_ = *reinterpret_cast<Data*>(data);
   data_.done = done;
   if (!done) {
      data_.result = i;
   }
   data_.coro_handle.resume();
}

// C API that produces results and calls the callback to consume each result.
// Results are integers.
void start_work(void* data, Callback cb) {
    bool done = false;
    for (int i = 0; i < 10; ++i) {
       cb(i, data, done); // !done case
    }
    done = true;
    cb(0, data, done); // done case
}

struct Awaiter : std::suspend_always {
    Data& data;
    bool first;

    bool await_ready() {
        return data.result.has_value();
    }

    void await_suspend(std::coroutine_handle<> h) {
        data.coro_handle = h;
        if (first) start_work(&data, consumer_callback);
    }

    int await_resume() {
        assert(data.result.has_value());
        auto opt = std::exchange(data.result, std::nullopt);
        return opt.value();
    }
};

Awaiter nextResult(Data& data, bool first) {
    return {{}, data, first};
}

cobalt::promise<void> source(cobalt::channel<std::string>& out) {
    co_await out.write("Hello world!");
    out.close();
}

cobalt::promise<void> process(cobalt::channel<std::string>& in, cobalt::channel<int>& out) {
    Data data;
    while (in.is_open() && out.is_open()) {
        auto _ = co_await in.read(); // ignore result for now
        auto first = true;
        while (!data.done || data.result.has_value()) {
            auto i = co_await nextResult(data, first);
            co_await out.write(i);
            first = false;
        }
    }
    in.close();
    out.close();
}

cobalt::promise<void> sink(cobalt::channel<int>& in) {
    while (in.is_open()) {
        auto i = co_await in.read(); // ignore result for now
    }
    in.close();
}

cobalt::main co_main(int argc, char* argv[]) {
    cobalt::channel<std::string> a;
    cobalt::channel<int> b;
    co_await cobalt::join(
        source(a),
        process(a, b),
        sink(b)
    );
    co_return 0;
}

The sink correctly receives all data, but when the process coroutine is done, there is inside Asio a coroutine resume to the null pointer. What am I doing wrong? Thanks!

Environment:

Ubuntu 20.04

Boost 1.85

g++13 -std=gnu++2a

4
  • edit your tags , this is CPP not C
    – ticktalk
    Commented Jul 15 at 13:52
  • This was because of the callback from a C API, but I guess it was confusing, thanks.
    – dvnh87
    Commented Jul 15 at 14:38
  • While nothing in the coroutines specification says that resuming the coroutine during await_suspend is illegal, nothing says that it is legal either .... so you are in the land of implementation defined behavior, and on all compilers this leads to the corruption of the process coroutine state, you need to go back to threads and condition variables for this.
    – Ahmed AEK
    Commented Jul 16 at 9:56
  • I see, thank you.
    – dvnh87
    Commented Jul 19 at 17:28

0

Browse other questions tagged or ask your own question.