/* This file is part of Telegram Desktop, the official desktop version of Telegram messaging app, see https://telegram.org Telegram Desktop is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. In addition, as a special exception, the copyright holders give permission to link the code of portions of this program with the OpenSSL library. Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org */ #pragma once #include "base/optional.h" #include #include #include #include #include namespace rpl { namespace details { template struct combine_state { combine_state() : accumulated(std::tuple...>()) { } base::optional...>> accumulated; base::optional> latest; int invalid = sizeof...(Values); int working = sizeof...(Values); }; template inline std::tuple combine_make_first( std::tuple...> &&accumulated, std::index_sequence) { return std::make_tuple(std::move(*std::get(accumulated))...); } template class combine_subscribe_one { public: combine_subscribe_one( const consumer_type &consumer, combine_state *state) : _consumer(consumer) , _state(state) { } template void subscribe(producer &&producer) { _consumer.add_lifetime(std::move(producer).start( [consumer = _consumer, state = _state](Value &&value) { if (!state->accumulated) { std::get(*state->latest) = std::move(value); consumer.put_next_copy(*state->latest); } else { auto &accumulated = std::get( *state->accumulated); if (accumulated) { accumulated = std::move(value); } else { accumulated = std::move(value); if (!--state->invalid) { constexpr auto kArity = sizeof...(Values); state->latest = combine_make_first( std::move(*state->accumulated), std::make_index_sequence()); state->accumulated = base::none; consumer.put_next_copy(*state->latest); } } } }, [consumer = _consumer](Error &&error) { consumer.put_error(std::move(error)); }, [consumer = _consumer, state = _state] { if (!--state->working) { consumer.put_done(); } })); } private: const consumer_type &_consumer; combine_state *_state = nullptr; }; template < typename consumer_type, typename ...Values, typename ...Errors, std::size_t ...I> inline void combine_subscribe( const consumer_type &consumer, combine_state *state, std::index_sequence, producer &&...producers) { auto consume = { ( details::combine_subscribe_one< I, consumer_type, Values... >( consumer, state ).subscribe(std::move(producers)), 0)... }; (void)consume; } template inline auto combine_implementation( producer &&...producers) { using CombinedError = details::normalized_variant_t; using Result = producer< std::tuple, CombinedError>; using consumer_type = typename Result::consumer_type; auto result = []( const consumer_type &consumer, producer &...producers) { auto state = consumer.template make_state< details::combine_state>(); constexpr auto kArity = sizeof...(Values); details::combine_subscribe( consumer, state, std::make_index_sequence(), std::move(producers)...); return lifetime(); }; return Result(std::bind( result, std::placeholders::_1, std::move(producers)...)); } template struct combine_just_producers : std::false_type { }; template constexpr bool combine_just_producers_v = combine_just_producers::value; template struct combine_just_producers...> : std::true_type { }; template struct combine_just_producers_list : type_list::extract_to_t { }; template struct combine_result_type; template using combine_result_type_t = typename combine_result_type::type; template struct combine_result_type...> { using type = std::tuple; }; template struct combine_result_type_list : type_list::extract_to_t { }; template using combine_result_type_list_t = typename combine_result_type_list::type; template using combine_producers_no_mapper_t = type_list::chop_last_t; template constexpr bool combine_is_good_mapper(std::true_type) { return is_callable_v< type_list::last_t, combine_result_type_list_t< combine_producers_no_mapper_t >>; } template constexpr bool combine_is_good_mapper(std::false_type) { return false; } template struct combine_producers_with_mapper_list : std::bool_constant< combine_is_good_mapper( combine_just_producers_list< combine_producers_no_mapper_t >())> { }; template struct combine_producers_with_mapper : combine_producers_with_mapper_list> { }; template constexpr bool combine_producers_with_mapper_v = combine_producers_with_mapper::value; template inline decltype(auto) combine_helper( std::true_type, producer &&...producers) { return combine_implementation(std::move(producers)...); } template inline decltype(auto) combine_call( std::index_sequence, Producers &&...producers) { return combine_implementation( argument_mapper::call(std::move(producers)...)...); } template inline decltype(auto) combine_helper( std::false_type, Args &&...args) { constexpr auto kProducersCount = sizeof...(Args) - 1; return combine_call( std::make_index_sequence(), std::forward(args)...) | map(argument_mapper::call( std::forward(args)...)); } } // namespace details template < typename ...Args, typename = std::enable_if_t< details::combine_just_producers_v || details::combine_producers_with_mapper_v>> inline decltype(auto) combine(Args &&...args) { return details::combine_helper( details::combine_just_producers(), std::forward(args)...); } namespace details { template struct combine_vector_state { std::vector> accumulated; std::vector latest; int invalid = 0; int working = 0; }; } // namespace details template inline producer, Error> combine( std::vector> &&producers) { if (producers.empty()) { return complete, Error>(); } using state_type = details::combine_vector_state; using consumer_type = consumer, Error>; return [producers = std::move(producers)]( const consumer_type &consumer) mutable { auto count = producers.size(); auto state = consumer.template make_state(); state->accumulated.resize(count); state->invalid = count; state->working = count; for (auto index = 0; index != count; ++index) { auto &producer = producers[index]; consumer.add_lifetime(std::move(producer).start( [consumer, state, index](Value &&value) { if (state->accumulated.empty()) { state->latest[index] = std::move(value); consumer.put_next_copy(state->latest); } else if (state->accumulated[index]) { state->accumulated[index] = std::move(value); } else { state->accumulated[index] = std::move(value); if (!--state->invalid) { state->latest.reserve( state->accumulated.size()); for (auto &&value : state->accumulated) { state->latest.push_back( std::move(*value)); } base::take(state->accumulated); consumer.put_next_copy(state->latest); } } }, [consumer](Error &&error) { consumer.put_error(std::move(error)); }, [consumer, state] { if (!--state->working) { consumer.put_done(); } })); } return lifetime(); }; } template inline auto combine( std::vector> &&producers, Mapper &&mapper) { return combine(std::move(producers)) | map(std::forward(mapper)); } } // namespace rpl