Add some rpl operators.

This commit is contained in:
John Preston 2017-09-12 16:58:14 +03:00
parent 873ccf8096
commit 487ddb5694
20 changed files with 1108 additions and 74 deletions

View file

@ -0,0 +1,37 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
#include <rpl/filter.h>
namespace rpl {
template <typename SideEffect>
auto before_next(SideEffect &&method) {
return filter([method = std::forward<SideEffect>(method)](
const auto &value) {
method(value);
return true;
});
}
} // namespace rpl

View file

@ -0,0 +1,35 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <typename Value = empty_value, typename Error = no_error>
producer<Value, Error> complete() {
return [](const consumer<Value, Error> &consumer) mutable {
consumer.put_done();
return lifetime();
};
}
} // namespace rpl

View file

@ -20,8 +20,9 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include "rpl/lifetime.h"
#include <mutex>
#include <gsl/gsl_assert>
#include <rpl/lifetime.h>
namespace rpl {
@ -36,7 +37,10 @@ struct no_error {
struct empty_value {
};
template <typename Value, typename Error>
struct empty_error {
};
template <typename Value = empty_value, typename Error = no_error>
class consumer {
public:
template <
@ -57,7 +61,11 @@ public:
void put_error_copy(const Error &error) const;
void put_done() const;
void set_lifetime(lifetime &&lifetime) const;
void add_lifetime(lifetime &&lifetime) const;
template <typename Type, typename... Args>
Type *make_state(Args&& ...args) const;
void terminate() const;
bool operator==(const consumer &other) const {
@ -103,7 +111,11 @@ public:
virtual void put_error(Error &&error) = 0;
virtual void put_done() = 0;
void set_lifetime(lifetime &&lifetime);
void add_lifetime(lifetime &&lifetime);
template <typename Type, typename... Args>
Type *make_state(Args&& ...args);
void terminate();
protected:
@ -210,14 +222,21 @@ void consumer<Value, Error>::put_done() const {
}
template <typename Value, typename Error>
void consumer<Value, Error>::set_lifetime(lifetime &&lifetime) const {
void consumer<Value, Error>::add_lifetime(lifetime &&lifetime) const {
if (_instance) {
_instance->set_lifetime(std::move(lifetime));
_instance->add_lifetime(std::move(lifetime));
} else {
lifetime.destroy();
}
}
template <typename Value, typename Error>
template <typename Type, typename... Args>
Type *consumer<Value, Error>::make_state(Args&& ...args) const {
Expects(_instance != nullptr);
return _instance->template make_state<Type>(std::forward<Args>(args)...);
}
template <typename Value, typename Error>
void consumer<Value, Error>::terminate() const {
if (_instance) {
@ -226,7 +245,7 @@ void consumer<Value, Error>::terminate() const {
}
template <typename Value, typename Error>
void consumer<Value, Error>::abstract_consumer_instance::set_lifetime(
void consumer<Value, Error>::abstract_consumer_instance::add_lifetime(
lifetime &&lifetime) {
std::unique_lock<std::mutex> lock(_mutex);
if (_terminated) {
@ -234,10 +253,19 @@ void consumer<Value, Error>::abstract_consumer_instance::set_lifetime(
lifetime.destroy();
} else {
_lifetime = std::move(lifetime);
_lifetime.add(std::move(lifetime));
}
}
template <typename Value, typename Error>
template <typename Type, typename... Args>
Type *consumer<Value, Error>::abstract_consumer_instance::make_state(
Args&& ...args) {
std::unique_lock<std::mutex> lock(_mutex);
Expects(!_terminated);
return _lifetime.template make_state<Type>(std::forward<Args>(args)...);
}
template <typename Value, typename Error>
void consumer<Value, Error>::abstract_consumer_instance::terminate() {
std::unique_lock<std::mutex> lock(_mutex);

View file

@ -0,0 +1,38 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <
typename Creator,
typename Value = typename decltype(std::declval<Creator>()())::value_type,
typename Error = typename decltype(std::declval<Creator>()())::error_type>
producer<Value, Error> deferred(Creator &&creator) {
return [creator = std::forward<Creator>(creator)](
const consumer<Value, Error> &consumer) mutable {
return std::move(creator)().start_existing(consumer);
};
}
} // namespace rpl

View file

@ -0,0 +1,62 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
#include "base/optional.h"
namespace rpl {
namespace details {
class distinct_until_changed_helper {
public:
template <typename Value, typename Error>
rpl::producer<Value, Error> operator()(
rpl::producer<Value, Error> &&initial) const {
return [initial = std::move(initial)](
const consumer<Value, Error> &consumer) mutable {
auto previous = consumer.make_state<
base::optional<Value>
>();
return std::move(initial).start(
[consumer, previous](Value &&value) {
if (!(*previous) || (**previous) != value) {
*previous = value;
consumer.put_next(std::move(value));
}
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer] {
consumer.put_done();
});
};
}
};
} // namespace details
inline auto distinct_until_changed()
-> details::distinct_until_changed_helper {
return details::distinct_until_changed_helper();
}
} // namespace rpl

View file

@ -20,36 +20,50 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include "producer.h"
#include <rpl/producer.h>
#include <rpl/single.h>
#include <rpl/then.h>
#include "base/algorithm.h"
#include "base/assertion.h"
#include "base/index_based_iterator.h"
namespace rpl {
template <typename Value>
// Currently not thread-safe :(
template <typename Value = empty_value>
class event_stream {
public:
event_stream();
event_stream(event_stream &&other);
void fire(Value &&value);
void fire_copy(const Value &value) {
auto copy = value;
fire(std::move(copy));
}
producer<Value, no_error> events() const;
producer<Value, no_error> events_starting_with(
Value &&value) const {
return single(std::move(value)) | then(events());
}
producer<Value, no_error> events_starting_with_copy(
const Value &value) const {
auto copy = value;
return events_starting_with(std::move(copy));
}
~event_stream();
private:
std::weak_ptr<std::vector<consumer<Value, no_error>>> weak() const {
return _consumers;
}
std::weak_ptr<std::vector<consumer<Value, no_error>>> weak() const;
std::shared_ptr<std::vector<consumer<Value, no_error>>> _consumers;
mutable std::shared_ptr<std::vector<consumer<Value, no_error>>> _consumers;
};
template <typename Value>
event_stream<Value>::event_stream()
: _consumers(std::make_shared<std::vector<consumer<Value, no_error>>>()) {
event_stream<Value>::event_stream() {
}
template <typename Value>
@ -59,7 +73,10 @@ event_stream<Value>::event_stream(event_stream &&other)
template <typename Value>
void event_stream<Value>::fire(Value &&value) {
Expects(_consumers != nullptr);
if (!_consumers) {
return;
}
auto &consumers = *_consumers;
auto begin = base::index_based_begin(consumers);
auto end = base::index_based_end(consumers);
@ -97,7 +114,8 @@ void event_stream<Value>::fire(Value &&value) {
template <typename Value>
producer<Value, no_error> event_stream<Value>::events() const {
return producer<Value, no_error>([weak = weak()](consumer<Value, no_error> consumer) {
return producer<Value, no_error>([weak = weak()](
const consumer<Value, no_error> &consumer) {
if (auto strong = weak.lock()) {
auto result = [weak, consumer] {
if (auto strong = weak.lock()) {
@ -114,6 +132,15 @@ producer<Value, no_error> event_stream<Value>::events() const {
});
}
template <typename Value>
std::weak_ptr<std::vector<consumer<Value, no_error>>> event_stream<Value>::weak() const {
if (!_consumers) {
_consumers = std::make_shared<std::vector<consumer<Value, no_error>>>();
}
return _consumers;
}
template <typename Value>
event_stream<Value>::~event_stream() {
if (_consumers) {
@ -123,4 +150,11 @@ event_stream<Value>::~event_stream() {
}
}
template <typename Value>
inline auto to_stream(event_stream<Value> &stream) {
return on_next([&stream](Value &&value) {
stream.fire(std::move(value));
});
}
} // namespace rpl

View file

@ -0,0 +1,37 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <typename Value, typename Error>
producer<Value, std::decay_t<Error>> fail(Error &&error) {
using consumer_t = consumer<Value, std::decay_t<Error>>;
return [error = std::forward<Error>(error)](
const consumer_t &consumer) mutable {
consumer.put_error(std::move(error));
return lifetime();
};
}
} // namespace rpl

View file

@ -0,0 +1,75 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
namespace details {
template <typename Predicate>
class filter_helper {
public:
template <typename OtherPredicate>
filter_helper(OtherPredicate &&predicate)
: _predicate(std::forward<OtherPredicate>(predicate)) {
}
template <typename Value, typename Error>
rpl::producer<Value, Error> operator()(
rpl::producer<Value, Error> &&initial) {
return [
initial = std::move(initial),
predicate = std::move(_predicate)
](
const consumer<Value, Error> &consumer) mutable {
return std::move(initial).start(
[
consumer,
predicate = std::move(predicate)
](Value &&value) {
const auto &immutable = value;
if (predicate(immutable)) {
consumer.put_next(std::move(value));
}
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer] {
consumer.put_done();
});
};
}
private:
Predicate _predicate;
};
} // namespace details
template <typename Predicate>
auto filter(Predicate &&predicate)
-> details::filter_helper<std::decay_t<Predicate>> {
return details::filter_helper<std::decay_t<Predicate>>(
std::forward<Predicate>(predicate));
}
} // namespace rpl

View file

@ -0,0 +1,82 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
namespace details {
class flatten_latest_helper {
public:
template <typename Value, typename Error>
rpl::producer<Value, Error> operator()(
rpl::producer<
rpl::producer<Value, Error>,
Error
> &&initial) const {
return [initial = std::move(initial)](
const consumer<Value, Error> &consumer) mutable {
auto state = std::make_shared<State>();
return std::move(initial).start(
[consumer, state](rpl::producer<Value, Error> &&inner) {
state->finished = false;
state->alive = std::move(inner).start(
[consumer](Value &&value) {
consumer.put_next(std::move(value));
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer, state] {
if (state->finished) {
consumer.put_done();
} else {
state->finished = true;
}
});
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer, state] {
if (state->finished) {
consumer.put_done();
} else {
state->finished = true;
}
});
};
}
private:
struct State {
lifetime alive;
bool finished = false;
};
};
} // namespace details
inline auto flatten_latest()
-> details::flatten_latest_helper {
return details::flatten_latest_helper();
}
} // namespace rpl

View file

@ -0,0 +1,76 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
namespace details {
template <typename Transform>
class map_helper {
public:
template <typename OtherTransform>
map_helper(OtherTransform &&transform)
: _transform(std::forward<OtherTransform>(transform)) {
}
template <
typename Value,
typename Error,
typename NewValue = decltype(
std::declval<Transform>()(std::declval<Value>())
)>
rpl::producer<NewValue, Error> operator()(
rpl::producer<Value, Error> &&initial) {
return [
initial = std::move(initial),
transform = std::move(_transform)
](const consumer<NewValue, Error> &consumer) mutable {
return std::move(initial).start(
[
consumer,
transform = std::move(transform)
](Value &&value) {
consumer.put_next(transform(std::move(value)));
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer] {
consumer.put_done();
});
};
}
private:
Transform _transform;
};
} // namespace details
template <typename Transform>
auto map(Transform &&transform)
-> details::map_helper<std::decay_t<Transform>> {
return details::map_helper<std::decay_t<Transform>>(
std::forward<Transform>(transform));
}
} // namespace rpl

View file

@ -0,0 +1,34 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <typename Value = empty_value, typename Error = no_error>
producer<Value, Error> never() {
return [](const consumer<Value, Error> &consumer) mutable {
return lifetime();
};
}
} // namespace rpl

View file

@ -0,0 +1,248 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#include "catch.hpp"
#include <rpl/rpl.h>
#include <string>
using namespace rpl;
class OnDestructor {
public:
OnDestructor(base::lambda_once<void()> callback)
: _callback(std::move(callback)) {
}
~OnDestructor() {
if (_callback) {
_callback();
}
}
private:
base::lambda_once<void()> _callback;
};
class InvokeCounter {
public:
InvokeCounter(
const std::shared_ptr<int> &copyCounter,
const std::shared_ptr<int> &moveCounter)
: _copyCounter(copyCounter)
, _moveCounter(moveCounter) {
}
InvokeCounter(const InvokeCounter &other)
: _copyCounter(other._copyCounter)
, _moveCounter(other._moveCounter) {
if (_copyCounter) {
++*_copyCounter;
}
}
InvokeCounter(InvokeCounter &&other)
: _copyCounter(base::take(other._copyCounter))
, _moveCounter(base::take(other._moveCounter)) {
if (_moveCounter) {
++*_moveCounter;
}
}
InvokeCounter &operator=(const InvokeCounter &other) {
_copyCounter = other._copyCounter;
_moveCounter = other._moveCounter;
if (_copyCounter) {
++*_copyCounter;
}
}
InvokeCounter &operator=(InvokeCounter &&other) {
_copyCounter = base::take(other._copyCounter);
_moveCounter = base::take(other._moveCounter);
if (_moveCounter) {
++*_moveCounter;
}
}
private:
std::shared_ptr<int> _copyCounter;
std::shared_ptr<int> _moveCounter;
};
TEST_CASE("basic operators tests", "[rpl::operators]") {
SECTION("single test") {
auto sum = std::make_shared<int>(0);
auto doneGenerated = std::make_shared<bool>(false);
auto destroyed = std::make_shared<bool>(false);
auto copyCount = std::make_shared<int>(0);
auto moveCount = std::make_shared<int>(0);
{
InvokeCounter counter(copyCount, moveCount);
auto destroyCalled = std::make_shared<OnDestructor>([=] {
*destroyed = true;
});
rpl::lifetime lifetime;
single(std::move(counter))
| on_next([=](InvokeCounter&&) {
(void)destroyCalled;
++*sum;
}) | on_error([=](no_error) {
(void)destroyCalled;
}) | on_done([=] {
(void)destroyCalled;
*doneGenerated = true;
}) | start(lifetime);
}
REQUIRE(*sum == 1);
REQUIRE(*doneGenerated);
REQUIRE(*destroyed);
REQUIRE(*copyCount == 0);
}
SECTION("then test") {
auto sum = std::make_shared<int>(0);
auto doneGenerated = std::make_shared<bool>(false);
auto destroyed = std::make_shared<bool>(false);
auto copyCount = std::make_shared<int>(0);
auto moveCount = std::make_shared<int>(0);
{
auto testing = complete<InvokeCounter>();
for (auto i = 0; i != 5; ++i) {
InvokeCounter counter(copyCount, moveCount);
testing = std::move(testing)
| then(single(std::move(counter)));
}
auto destroyCalled = std::make_shared<OnDestructor>([=] {
*destroyed = true;
});
rpl::lifetime lifetime;
std::move(testing)
| then(complete<InvokeCounter>())
| on_next([=](InvokeCounter&&) {
(void)destroyCalled;
++*sum;
}) | on_error([=](no_error) {
(void)destroyCalled;
}) | on_done([=] {
(void)destroyCalled;
*doneGenerated = true;
}) | start(lifetime);
}
REQUIRE(*sum == 5);
REQUIRE(*doneGenerated);
REQUIRE(*destroyed);
REQUIRE(*copyCount == 0);
}
SECTION("map test") {
auto sum = std::make_shared<std::string>("");
{
rpl::lifetime lifetime;
single(1)
| then(single(2))
| then(single(3))
| then(single(4))
| then(single(5))
| map([](int value) {
return std::to_string(value);
}) | on_next([=](std::string &&value) {
*sum += std::move(value) + ' ';
}) | start(lifetime);
}
REQUIRE(*sum == "1 2 3 4 5 ");
}
SECTION("deferred test") {
auto launched = std::make_shared<int>(0);
auto checked = std::make_shared<int>(0);
{
rpl::lifetime lifetime;
auto make_next = [=] {
return deferred([=] {
return single(++*launched);
});
};
make_next()
| then(make_next())
| then(make_next())
| then(make_next())
| then(make_next())
| on_next([=](int value) {
REQUIRE(++*checked == *launched);
REQUIRE(*checked == value);
}) | start(lifetime);
REQUIRE(*launched == 5);
}
}
SECTION("filter test") {
auto sum = std::make_shared<std::string>("");
{
rpl::lifetime lifetime;
single(1)
| then(single(1))
| then(single(2))
| then(single(2))
| then(single(3))
| filter([](int value) { return value != 2; })
| map([](int value) {
return std::to_string(value);
}) | on_next([=](std::string &&value) {
*sum += std::move(value) + ' ';
}) | start(lifetime);
}
REQUIRE(*sum == "1 1 3 ");
}
SECTION("distinct_until_changed test") {
auto sum = std::make_shared<std::string>("");
{
rpl::lifetime lifetime;
single(1)
| then(single(1))
| then(single(2))
| then(single(2))
| then(single(3))
| distinct_until_changed()
| map([](int value) {
return std::to_string(value);
}) | on_next([=](std::string &&value) {
*sum += std::move(value) + ' ';
}) | start(lifetime);
}
REQUIRE(*sum == "1 2 3 ");
}
SECTION("flatten_latest test") {
auto sum = std::make_shared<std::string>("");
{
rpl::lifetime lifetime;
single(single(1) | then(single(2)))
| then(single(single(3) | then(single(4))))
| then(single(single(5) | then(single(6))))
| flatten_latest()
| map([](int value) {
return std::to_string(value);
}) | on_next([=](std::string &&value) {
*sum += std::move(value) + ' ';
}) | start(lifetime);
}
REQUIRE(*sum == "1 2 3 4 5 6 ");
}
}

View file

@ -21,19 +21,70 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#pragma once
#include "base/lambda.h"
#include "rpl/consumer.h"
#include "rpl/lifetime.h"
#include <rpl/consumer.h>
#include <rpl/lifetime.h>
namespace rpl {
namespace details {
template <typename Value, typename Error = no_error>
template <typename Lambda>
class mutable_lambda_wrap {
public:
mutable_lambda_wrap(Lambda &&lambda)
: _lambda(std::move(lambda)) {
}
template <typename... Args>
auto operator()(Args&&... args) const {
return (const_cast<mutable_lambda_wrap*>(this)->_lambda)(
std::forward<Args>(args)...);
}
private:
Lambda _lambda;
};
// Type-erased copyable mutable lambda using base::lambda.
template <typename Function> class mutable_lambda;
template <typename Return, typename ...Args>
class mutable_lambda<Return(Args...)> {
public:
// Copy / move construct / assign from an arbitrary type.
template <
typename Lambda,
typename = std::enable_if_t<std::is_convertible<
decltype(std::declval<Lambda>()(std::declval<Args>()...)),
Return
>::value>>
mutable_lambda(Lambda other) : _implementation(mutable_lambda_wrap<Lambda>(std::move(other))) {
}
template <
typename ...OtherArgs,
typename = std::enable_if_t<(sizeof...(Args) == sizeof...(OtherArgs))>>
Return operator()(OtherArgs&&... args) {
return _implementation(std::forward<OtherArgs>(args)...);
}
private:
base::lambda<Return(Args...)> _implementation;
};
} // namespace details
template <typename Value = empty_value, typename Error = no_error>
class producer {
public:
using value_type = Value;
using error_type = Error;
using consumer_type = consumer<Value, Error>;
template <typename Generator, typename = std::enable_if<std::is_convertible<
decltype(std::declval<Generator>()(std::declval<consumer<Value, Error>>())),
decltype(std::declval<Generator>()(std::declval<consumer_type>())),
lifetime
>::value>>
producer(Generator &&generator);
@ -48,10 +99,25 @@ public:
lifetime start(
OnNext &&next,
OnError &&error,
OnDone &&done) const;
OnDone &&done) &&;
template <
typename OnNext,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
lifetime start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done) const &;
lifetime start_existing(const consumer_type &consumer) &&;
private:
base::lambda<lifetime(consumer<Value, Error>)> _generator;
details::mutable_lambda<
lifetime(const consumer_type &)> _generator;
};
@ -72,13 +138,37 @@ template <
lifetime producer<Value, Error>::start(
OnNext &&next,
OnError &&error,
OnDone &&done) const {
auto result = consumer<Value, Error>(
OnDone &&done) && {
return std::move(*this).start_existing(consumer<Value, Error>(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done)));
}
template <typename Value, typename Error>
template <
typename OnNext,
typename OnError,
typename OnDone,
typename,
typename,
typename>
lifetime producer<Value, Error>::start_copy(
OnNext &&next,
OnError &&error,
OnDone &&done) const & {
auto copy = *this;
return std::move(copy).start(
std::forward<OnNext>(next),
std::forward<OnError>(error),
std::forward<OnDone>(done));
result.set_lifetime(_generator(result));
return [result] { result.terminate(); };
}
template <typename Value, typename Error>
lifetime producer<Value, Error>::start_existing(
const consumer_type &consumer) && {
consumer.add_lifetime(std::move(_generator)(consumer));
return [consumer] { consumer.terminate(); };
}
template <typename Value, typename Error>
@ -91,21 +181,21 @@ template <
typename Error,
typename Method,
typename = decltype(std::declval<Method>()(std::declval<producer<Value, Error>>()))>
inline decltype(auto) operator|(producer<Value, Error> &&producer, Method &&method) {
inline auto operator|(producer<Value, Error> &&producer, Method &&method) {
return std::forward<Method>(method)(std::move(producer));
}
template <typename OnNext>
inline decltype(auto) bind_on_next(OnNext &&handler) {
inline auto bind_on_next(OnNext &&handler) {
return [handler = std::forward<OnNext>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<no_value, error_type>([
existing = std::move(existing),
handler = std::forward<OnNext>(handler)
](consumer<no_value, error_type> consumer) {
return existing.start([handler = std::decay_t<OnNext>(handler)](
value_type &&value) {
handler = std::move(handler)
](const consumer<no_value, error_type> &consumer) mutable {
return std::move(existing).start(
[handler = std::move(handler)](value_type &&value) {
handler(std::move(value));
}, [consumer](error_type &&error) {
consumer.put_error(std::move(error));
@ -117,17 +207,18 @@ inline decltype(auto) bind_on_next(OnNext &&handler) {
}
template <typename OnError>
inline decltype(auto) bind_on_error(OnError &&handler) {
inline auto bind_on_error(OnError &&handler) {
return [handler = std::forward<OnError>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<value_type, no_error>([
existing = std::move(existing),
handler = std::forward<OnError>(handler)
](consumer<value_type, no_error> consumer) {
return existing.start([consumer](value_type &&value) {
handler = std::move(handler)
](const consumer<value_type, no_error> &consumer) mutable {
return std::move(existing).start(
[consumer](value_type &&value) {
consumer.put_next(std::move(value));
}, [handler = std::decay_t<OnError>(handler)](error_type &&error) {
}, [handler = std::move(handler)](error_type &&error) {
handler(std::move(error));
}, [consumer] {
consumer.put_done();
@ -137,19 +228,20 @@ inline decltype(auto) bind_on_error(OnError &&handler) {
}
template <typename OnDone>
inline decltype(auto) bind_on_done(OnDone &&handler) {
inline auto bind_on_done(OnDone &&handler) {
return [handler = std::forward<OnDone>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<value_type, error_type>([
existing = std::move(existing),
handler = std::forward<OnDone>(handler)
](consumer<value_type, error_type> consumer) {
return existing.start([consumer](value_type &&value) {
handler = std::move(handler)
](const consumer<value_type, error_type> &consumer) mutable {
return std::move(existing).start(
[consumer](value_type &&value) {
consumer.put_next(std::move(value));
}, [consumer](error_type &&value) {
consumer.put_error(std::move(value));
}, [handler = std::decay_t<OnDone>(handler)] {
}, [handler = std::move(handler)] {
handler();
});
});
@ -500,10 +592,11 @@ inline void operator|(
OnError,
OnDone> &&producer_with_next_error_done,
lifetime_holder &&lifetime) {
lifetime.alive_while.add(producer_with_next_error_done.producer.start(
std::move(producer_with_next_error_done.next),
std::move(producer_with_next_error_done.error),
std::move(producer_with_next_error_done.done)));
lifetime.alive_while.add(
std::move(producer_with_next_error_done.producer).start(
std::move(producer_with_next_error_done.next),
std::move(producer_with_next_error_done.error),
std::move(producer_with_next_error_done.done)));
}
template <typename Value, typename Error>

View file

@ -20,8 +20,8 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#include "catch.hpp"
#include "rpl/producer.h"
#include "rpl/event_stream.h"
#include <rpl/producer.h>
#include <rpl/event_stream.h>
using namespace rpl;
@ -52,7 +52,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
*destroyed = true;
});
{
producer<int, no_error>([=](auto consumer) {
producer<int, no_error>([=](auto &&consumer) {
(void)destroyCaller;
consumer.put_next(1);
consumer.put_next(2);
@ -82,7 +82,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("producer error test") {
auto errorGenerated = std::make_shared<bool>(false);
{
producer<no_value, bool>([=](auto consumer) {
producer<no_value, bool>([=](auto &&consumer) {
consumer.put_error(true);
return lifetime();
}).start([=](no_value) {
@ -99,16 +99,16 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
{
auto lifetimes = lifetime();
{
auto testProducer = producer<no_value, no_error>([=](auto consumer) {
auto testProducer = producer<no_value, no_error>([=](auto &&consumer) {
return [=] {
++*lifetimeEndCount;
};
});
lifetimes.add(testProducer.start([=](no_value) {
lifetimes.add(testProducer.start_copy([=](no_value) {
}, [=](no_error) {
}, [=] {
}));
lifetimes.add(testProducer.start([=](no_value) {
lifetimes.add(std::move(testProducer).start([=](no_value) {
}, [=](no_error) {
}, [=] {
}));
@ -123,8 +123,8 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto lifetimeEndCount = std::make_shared<int>(0);
auto saved = lifetime();
{
saved = producer<int, no_error>([=](auto consumer) {
auto inner = producer<int, no_error>([=](auto consumer) {
saved = producer<int, no_error>([=](auto &&consumer) {
auto inner = producer<int, no_error>([=](auto &&consumer) {
consumer.put_next(1);
consumer.put_next(2);
consumer.put_next(3);
@ -135,12 +135,12 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto result = lifetime([=] {
++*lifetimeEndCount;
});
result.add(inner.start([=](int value) {
result.add(inner.start_copy([=](int value) {
consumer.put_next_copy(value);
}, [=](no_error) {
}, [=] {
}));
result.add(inner.start([=](int value) {
result.add(std::move(inner).start([=](int value) {
consumer.put_next_copy(value);
}, [=](no_error) {
}, [=] {
@ -157,7 +157,9 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
saved.destroy();
REQUIRE(*lifetimeEndCount == 3);
}
}
TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
SECTION("event_stream basic test") {
auto sum = std::make_shared<int>(0);
event_stream<int> stream;
@ -326,7 +328,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
auto doneGenerated = std::make_shared<bool>(false);
{
auto alive = lifetime();
producer<int, no_error>([=](auto consumer) {
producer<int, no_error>([=](auto &&consumer) {
consumer.put_next(1);
consumer.put_next(2);
consumer.put_next(3);
@ -338,7 +340,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
*doneGenerated = true;
}) | start(alive);
producer<no_value, int>([=](auto consumer) {
producer<no_value, int>([=](auto &&consumer) {
consumer.put_error(4);
return lifetime();
}) | bind_on_error([=](int value) {
@ -356,7 +358,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
auto dones = std::make_shared<int>(0);
{
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(1);
consumer.put_done();
return lifetime();
@ -364,7 +366,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
*sum += value;
}) | start(alive);
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(11);
consumer.put_error(111);
return lifetime();
@ -372,7 +374,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
*sum += value;
}) | start(alive);
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(1111);
consumer.put_done();
return lifetime();
@ -380,7 +382,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
*dones += 1;
}) | start(alive);
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(11111);
consumer.put_next(11112);
consumer.put_next(11113);
@ -394,7 +396,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
}
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(111111);
consumer.put_next(111112);
consumer.put_next(111113);
@ -406,7 +408,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
*dones += 11;
}) | start(alive);
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_error(1111111);
return lifetime();
}) | on_error([=](int value) {
@ -434,7 +436,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
for (int i = 0; i != 3; ++i) {
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
producer<int, int>([=](auto &&consumer) {
consumer.put_next(1);
consumer.put_done();
return lifetime();

View file

@ -0,0 +1,40 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/lifetime.h>
#include <rpl/consumer.h>
#include <rpl/producer.h>
#include <rpl/event_stream.h>
#include <rpl/single.h>
#include <rpl/complete.h>
#include <rpl/fail.h>
#include <rpl/never.h>
#include <rpl/then.h>
#include <rpl/deferred.h>
#include <rpl/map.h>
#include <rpl/filter.h>
#include <rpl/distinct_until_changed.h>
#include <rpl/flatten_latest.h>
#include <rpl/before_next.h>

View file

@ -0,0 +1,46 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <typename Value, typename Error = no_error>
producer<std::decay_t<Value>, Error> single(Value &&value) {
using consumer_t = consumer<std::decay_t<Value>, Error>;
return [value = std::forward<Value>(value)](
const consumer_t &consumer) mutable {
consumer.put_next(std::move(value));
consumer.put_done();
return lifetime();
};
}
template <typename Error = no_error>
producer<empty_value, Error> single() {
return [](const consumer<empty_value, Error> &consumer) {
consumer.put_next({});
consumer.put_done();
};
}
} // namespace rpl

View file

@ -0,0 +1,58 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
namespace rpl {
template <typename Value, typename Error>
auto then(producer<Value, Error> &&following) {
return [following = std::move(following)](
producer<Value, Error> &&initial) mutable
-> producer<Value, Error> {
return [
initial = std::move(initial),
following = std::move(following)
](const consumer<Value, Error> &consumer) mutable {
return std::move(initial).start(
[consumer](Value &&value) {
consumer.put_next(std::move(value));
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [
consumer,
following = std::move(following)
]() mutable {
consumer.add_lifetime(std::move(following).start(
[consumer](Value &&value) {
consumer.put_next(std::move(value));
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer] {
consumer.put_done();
}));
});
};
};
}
} // namespace rpl

View file

@ -401,10 +401,6 @@
<(src_loc)/profile/profile_userpic_button.h
<(src_loc)/profile/profile_widget.cpp
<(src_loc)/profile/profile_widget.h
<(src_loc)/rpl/consumer.h
<(src_loc)/rpl/event_stream.h
<(src_loc)/rpl/lifetime.h
<(src_loc)/rpl/producer.h
<(src_loc)/settings/settings_advanced_widget.cpp
<(src_loc)/settings/settings_advanced_widget.h
<(src_loc)/settings/settings_background_widget.cpp

View file

@ -92,16 +92,29 @@
'<(src_loc)/base/flat_set_tests.cpp',
],
}, {
'target_name': 'tests_producer',
'target_name': 'tests_rpl',
'includes': [
'common_test.gypi',
],
'sources': [
'<(src_loc)/rpl/before_next.h',
'<(src_loc)/rpl/complete.h',
'<(src_loc)/rpl/consumer.h',
'<(src_loc)/rpl/deferred.h',
'<(src_loc)/rpl/distinct_until_changed.h',
'<(src_loc)/rpl/event_stream.h',
'<(src_loc)/rpl/fail.h',
'<(src_loc)/rpl/filter.h',
'<(src_loc)/rpl/flatten_latest.h',
'<(src_loc)/rpl/lifetime.h',
'<(src_loc)/rpl/map.h',
'<(src_loc)/rpl/never.h',
'<(src_loc)/rpl/operators_tests.cpp',
'<(src_loc)/rpl/producer.h',
'<(src_loc)/rpl/producer_tests.cpp',
'<(src_loc)/rpl/rpl.h',
'<(src_loc)/rpl/single.h',
'<(src_loc)/rpl/then.h',
],
}],
}

View file

@ -2,4 +2,4 @@ tests_algorithm
tests_flags
tests_flat_map
tests_flat_set
tests_producer
tests_rpl