mirror of
https://github.com/vale981/tdesktop
synced 2025-03-09 12:36:39 -04:00

Currently the build without implicitly included precompiled header is not supported anyway (because Qt MOC source files do not include stdafx.h, they include plain headers). So when we decide to support building without implicitly included precompiled headers we'll have to fix all the headers anyway.
529 lines
15 KiB
C++
529 lines
15 KiB
C++
/*
|
|
This file is part of Telegram Desktop,
|
|
the official desktop version of Telegram messaging app, see https://telegram.org
|
|
|
|
Telegram Desktop is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
In addition, as a special exception, the copyright holders give permission
|
|
to link the code of portions of this program with the OpenSSL library.
|
|
|
|
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
|
|
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
|
|
*/
|
|
#include "mtproto/session.h"
|
|
|
|
#include "mtproto/connection.h"
|
|
|
|
namespace MTP {
|
|
namespace internal {
|
|
|
|
void SessionData::clear(Instance *instance) {
|
|
RPCCallbackClears clearCallbacks;
|
|
{
|
|
QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex());
|
|
mtpResponseMap::const_iterator end = haveReceived.cend();
|
|
clearCallbacks.reserve(haveSent.size() + wereAcked.size());
|
|
for (mtpRequestMap::const_iterator i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
|
|
mtpRequestId requestId = i.value()->requestId;
|
|
if (haveReceived.find(requestId) == end) {
|
|
clearCallbacks.push_back(requestId);
|
|
}
|
|
}
|
|
for (mtpRequestIdsMap::const_iterator i = toResend.cbegin(), e = toResend.cend(); i != e; ++i) {
|
|
mtpRequestId requestId = i.value();
|
|
if (haveReceived.find(requestId) == end) {
|
|
clearCallbacks.push_back(requestId);
|
|
}
|
|
}
|
|
for (mtpRequestIdsMap::const_iterator i = wereAcked.cbegin(), e = wereAcked.cend(); i != e; ++i) {
|
|
mtpRequestId requestId = i.value();
|
|
if (haveReceived.find(requestId) == end) {
|
|
clearCallbacks.push_back(requestId);
|
|
}
|
|
}
|
|
}
|
|
{
|
|
QWriteLocker locker(haveSentMutex());
|
|
haveSent.clear();
|
|
}
|
|
{
|
|
QWriteLocker locker(toResendMutex());
|
|
toResend.clear();
|
|
}
|
|
{
|
|
QWriteLocker locker(wereAckedMutex());
|
|
wereAcked.clear();
|
|
}
|
|
{
|
|
QWriteLocker locker(receivedIdsMutex());
|
|
receivedIds.clear();
|
|
}
|
|
instance->clearCallbacksDelayed(clearCallbacks);
|
|
}
|
|
|
|
Session::Session(Instance *instance, ShiftedDcId shiftedDcId) : QObject()
|
|
, _instance(instance)
|
|
, data(this)
|
|
, dcWithShift(shiftedDcId) {
|
|
connect(&timeouter, SIGNAL(timeout()), this, SLOT(checkRequestsByTimer()));
|
|
timeouter.start(1000);
|
|
|
|
connect(&sender, SIGNAL(timeout()), this, SLOT(needToResumeAndSend()));
|
|
}
|
|
|
|
void Session::start() {
|
|
createDcData();
|
|
_connection = std::make_unique<Connection>(_instance);
|
|
_connection->start(&data, dcWithShift);
|
|
if (_instance->isKeysDestroyer()) {
|
|
_instance->scheduleKeyDestroy(dcWithShift);
|
|
}
|
|
}
|
|
|
|
void Session::createDcData() {
|
|
if (dc) {
|
|
return;
|
|
}
|
|
dc = _instance->getDcById(dcWithShift);
|
|
|
|
ReadLockerAttempt lock(keyMutex());
|
|
data.setKey(lock ? dc->getKey() : AuthKeyPtr());
|
|
if (lock && dc->connectionInited()) {
|
|
data.setLayerWasInited(true);
|
|
}
|
|
connect(dc.get(), SIGNAL(authKeyCreated()), this, SLOT(authKeyCreatedForDC()), Qt::QueuedConnection);
|
|
connect(dc.get(), SIGNAL(layerWasInited(bool)), this, SLOT(layerWasInitedForDC(bool)), Qt::QueuedConnection);
|
|
}
|
|
|
|
void Session::registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift) {
|
|
return _instance->registerRequest(requestId, dcWithShift);
|
|
}
|
|
|
|
mtpRequestId Session::storeRequest(mtpRequest &request, const RPCResponseHandler &parser) {
|
|
return _instance->storeRequest(request, parser);
|
|
}
|
|
|
|
mtpRequest Session::getRequest(mtpRequestId requestId) {
|
|
return _instance->getRequest(requestId);
|
|
}
|
|
|
|
bool Session::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data
|
|
return _instance->rpcErrorOccured(requestId, onFail, err);
|
|
}
|
|
|
|
void Session::restart() {
|
|
if (_killed) {
|
|
DEBUG_LOG(("Session Error: can't restart a killed session"));
|
|
return;
|
|
}
|
|
emit needToRestart();
|
|
}
|
|
|
|
void Session::stop() {
|
|
if (_killed) {
|
|
DEBUG_LOG(("Session Error: can't kill a killed session"));
|
|
return;
|
|
}
|
|
DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(dcWithShift));
|
|
if (_connection) {
|
|
_connection->kill();
|
|
_instance->queueQuittingConnection(std::move(_connection));
|
|
}
|
|
}
|
|
|
|
void Session::kill() {
|
|
stop();
|
|
_killed = true;
|
|
DEBUG_LOG(("Session Info: marked session dcWithShift %1 as killed").arg(dcWithShift));
|
|
}
|
|
|
|
void Session::unpaused() {
|
|
if (_needToReceive) {
|
|
_needToReceive = false;
|
|
QTimer::singleShot(0, this, SLOT(tryToReceive()));
|
|
}
|
|
}
|
|
|
|
void Session::sendAnything(qint64 msCanWait) {
|
|
if (_killed) {
|
|
DEBUG_LOG(("Session Error: can't send anything in a killed session"));
|
|
return;
|
|
}
|
|
auto ms = getms(true);
|
|
if (msSendCall) {
|
|
if (ms > msSendCall + msWait) {
|
|
msWait = 0;
|
|
} else {
|
|
msWait = (msSendCall + msWait) - ms;
|
|
if (msWait > msCanWait) {
|
|
msWait = msCanWait;
|
|
}
|
|
}
|
|
} else {
|
|
msWait = msCanWait;
|
|
}
|
|
if (msWait) {
|
|
DEBUG_LOG(("MTP Info: dcWithShift %1 can wait for %2ms from current %3").arg(dcWithShift).arg(msWait).arg(msSendCall));
|
|
msSendCall = ms;
|
|
sender.start(msWait);
|
|
} else {
|
|
DEBUG_LOG(("MTP Info: dcWithShift %1 stopped send timer, can wait for %2ms from current %3").arg(dcWithShift).arg(msWait).arg(msSendCall));
|
|
sender.stop();
|
|
msSendCall = 0;
|
|
needToResumeAndSend();
|
|
}
|
|
}
|
|
|
|
void Session::needToResumeAndSend() {
|
|
if (_killed) {
|
|
DEBUG_LOG(("Session Info: can't resume a killed session"));
|
|
return;
|
|
}
|
|
if (!_connection) {
|
|
DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(dcWithShift));
|
|
createDcData();
|
|
_connection = std::make_unique<Connection>(_instance);
|
|
_connection->start(&data, dcWithShift);
|
|
}
|
|
if (_ping) {
|
|
_ping = false;
|
|
emit needToPing();
|
|
} else {
|
|
emit needToSend();
|
|
}
|
|
}
|
|
|
|
void Session::sendPong(quint64 msgId, quint64 pingId) {
|
|
send(MTP_pong(MTP_long(msgId), MTP_long(pingId)));
|
|
}
|
|
|
|
void Session::sendMsgsStateInfo(quint64 msgId, QByteArray data) {
|
|
MTPMsgsStateInfo req(MTP_msgs_state_info(MTP_long(msgId), MTPstring()));
|
|
auto &info = req._msgs_state_info().vinfo._string().v;
|
|
info.resize(data.size());
|
|
if (!data.isEmpty()) {
|
|
memcpy(&info[0], data.constData(), data.size());
|
|
}
|
|
send(req);
|
|
}
|
|
|
|
void Session::checkRequestsByTimer() {
|
|
QVector<mtpMsgId> resendingIds;
|
|
QVector<mtpMsgId> removingIds; // remove very old (10 minutes) containers and resend requests
|
|
QVector<mtpMsgId> stateRequestIds;
|
|
|
|
{
|
|
QReadLocker locker(data.haveSentMutex());
|
|
mtpRequestMap &haveSent(data.haveSentMap());
|
|
uint32 haveSentCount(haveSent.size());
|
|
auto ms = getms(true);
|
|
for (mtpRequestMap::iterator i = haveSent.begin(), e = haveSent.end(); i != e; ++i) {
|
|
mtpRequest &req(i.value());
|
|
if (req->msDate > 0) {
|
|
if (req->msDate + MTPCheckResendTimeout < ms) { // need to resend or check state
|
|
if (mtpRequestData::messageSize(req) < MTPResendThreshold) { // resend
|
|
resendingIds.reserve(haveSentCount);
|
|
resendingIds.push_back(i.key());
|
|
} else {
|
|
req->msDate = ms;
|
|
stateRequestIds.reserve(haveSentCount);
|
|
stateRequestIds.push_back(i.key());
|
|
}
|
|
}
|
|
} else if (unixtime() > (int32)(i.key() >> 32) + MTPContainerLives) {
|
|
removingIds.reserve(haveSentCount);
|
|
removingIds.push_back(i.key());
|
|
}
|
|
}
|
|
}
|
|
|
|
if (stateRequestIds.size()) {
|
|
DEBUG_LOG(("MTP Info: requesting state of msgs: %1").arg(Logs::vector(stateRequestIds)));
|
|
{
|
|
QWriteLocker locker(data.stateRequestMutex());
|
|
for (uint32 i = 0, l = stateRequestIds.size(); i < l; ++i) {
|
|
data.stateRequestMap().insert(stateRequestIds[i], true);
|
|
}
|
|
}
|
|
sendAnything(MTPCheckResendWaiting);
|
|
}
|
|
if (!resendingIds.isEmpty()) {
|
|
for (uint32 i = 0, l = resendingIds.size(); i < l; ++i) {
|
|
DEBUG_LOG(("MTP Info: resending request %1").arg(resendingIds[i]));
|
|
resend(resendingIds[i], MTPCheckResendWaiting);
|
|
}
|
|
}
|
|
if (!removingIds.isEmpty()) {
|
|
RPCCallbackClears clearCallbacks;
|
|
{
|
|
QWriteLocker locker(data.haveSentMutex());
|
|
mtpRequestMap &haveSent(data.haveSentMap());
|
|
for (uint32 i = 0, l = removingIds.size(); i < l; ++i) {
|
|
mtpRequestMap::iterator j = haveSent.find(removingIds[i]);
|
|
if (j != haveSent.cend()) {
|
|
if (j.value()->requestId) {
|
|
clearCallbacks.push_back(j.value()->requestId);
|
|
}
|
|
haveSent.erase(j);
|
|
}
|
|
}
|
|
}
|
|
_instance->clearCallbacksDelayed(clearCallbacks);
|
|
}
|
|
}
|
|
|
|
void Session::onConnectionStateChange(qint32 newState) {
|
|
_instance->onStateChange(dcWithShift, newState);
|
|
}
|
|
|
|
void Session::onResetDone() {
|
|
_instance->onSessionReset(dcWithShift);
|
|
}
|
|
|
|
void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) {
|
|
if (requestId) {
|
|
QWriteLocker locker(data.toSendMutex());
|
|
data.toSendMap().remove(requestId);
|
|
}
|
|
if (msgId) {
|
|
QWriteLocker locker(data.haveSentMutex());
|
|
data.haveSentMap().remove(msgId);
|
|
}
|
|
}
|
|
|
|
void Session::ping() {
|
|
_ping = true;
|
|
sendAnything(0);
|
|
}
|
|
|
|
int32 Session::requestState(mtpRequestId requestId) const {
|
|
int32 result = MTP::RequestSent;
|
|
|
|
bool connected = false;
|
|
if (_connection) {
|
|
int32 s = _connection->state();
|
|
if (s == ConnectedState) {
|
|
connected = true;
|
|
} else if (s == ConnectingState || s == DisconnectedState) {
|
|
if (result < 0 || result == MTP::RequestSent) {
|
|
result = MTP::RequestConnecting;
|
|
}
|
|
} else if (s < 0) {
|
|
if ((result < 0 && s > result) || result == MTP::RequestSent) {
|
|
result = s;
|
|
}
|
|
}
|
|
}
|
|
if (!connected) {
|
|
return result;
|
|
}
|
|
if (!requestId) return MTP::RequestSent;
|
|
|
|
QWriteLocker locker(data.toSendMutex());
|
|
const mtpPreRequestMap &toSend(data.toSendMap());
|
|
mtpPreRequestMap::const_iterator i = toSend.constFind(requestId);
|
|
if (i != toSend.cend()) {
|
|
return MTP::RequestSending;
|
|
} else {
|
|
return MTP::RequestSent;
|
|
}
|
|
}
|
|
|
|
int32 Session::getState() const {
|
|
int32 result = -86400000;
|
|
|
|
if (_connection) {
|
|
int32 s = _connection->state();
|
|
if (s == ConnectedState) {
|
|
return s;
|
|
} else if (s == ConnectingState || s == DisconnectedState) {
|
|
if (result < 0) {
|
|
return s;
|
|
}
|
|
} else if (s < 0) {
|
|
if (result < 0 && s > result) {
|
|
result = s;
|
|
}
|
|
}
|
|
}
|
|
if (result == -86400000) {
|
|
result = DisconnectedState;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
QString Session::transport() const {
|
|
return _connection ? _connection->transport() : QString();
|
|
}
|
|
|
|
mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) {
|
|
mtpRequest request;
|
|
{
|
|
QWriteLocker locker(data.haveSentMutex());
|
|
mtpRequestMap &haveSent(data.haveSentMap());
|
|
|
|
mtpRequestMap::iterator i = haveSent.find(msgId);
|
|
if (i == haveSent.end()) {
|
|
if (sendMsgStateInfo) {
|
|
char cantResend[2] = {1, 0};
|
|
DEBUG_LOG(("Message Info: cant resend %1, request not found").arg(msgId));
|
|
|
|
return send(MTP_msgs_state_info(MTP_long(msgId), MTP_string(std::string(cantResend, cantResend + 1))));
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
request = i.value();
|
|
haveSent.erase(i);
|
|
}
|
|
if (mtpRequestData::isSentContainer(request)) { // for container just resend all messages we can
|
|
DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId));
|
|
const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8);
|
|
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
|
|
resend(ids[i], 10, true);
|
|
}
|
|
return 0xFFFFFFFF;
|
|
} else if (!mtpRequestData::isStateRequest(request)) {
|
|
request->msDate = forceContainer ? 0 : getms(true);
|
|
sendPrepared(request, msCanWait, false);
|
|
{
|
|
QWriteLocker locker(data.toResendMutex());
|
|
data.toResendMap().insert(msgId, request->requestId);
|
|
}
|
|
return request->requestId;
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
void Session::resendMany(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) {
|
|
for (int32 i = 0, l = msgIds.size(); i < l; ++i) {
|
|
resend(msgIds.at(i), msCanWait, forceContainer, sendMsgStateInfo);
|
|
}
|
|
}
|
|
|
|
void Session::resendAll() {
|
|
QVector<mtpMsgId> toResend;
|
|
{
|
|
QReadLocker locker(data.haveSentMutex());
|
|
const mtpRequestMap &haveSent(data.haveSentMap());
|
|
toResend.reserve(haveSent.size());
|
|
for (mtpRequestMap::const_iterator i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
|
|
if (i.value()->requestId) toResend.push_back(i.key());
|
|
}
|
|
}
|
|
for (uint32 i = 0, l = toResend.size(); i < l; ++i) {
|
|
resend(toResend[i], 10, true);
|
|
}
|
|
}
|
|
|
|
void Session::sendPrepared(const mtpRequest &request, TimeMs msCanWait, bool newRequest) { // returns true, if emit of needToSend() is needed
|
|
{
|
|
QWriteLocker locker(data.toSendMutex());
|
|
data.toSendMap().insert(request->requestId, request);
|
|
|
|
if (newRequest) {
|
|
*(mtpMsgId*)(request->data() + 4) = 0;
|
|
*(request->data() + 6) = 0;
|
|
}
|
|
}
|
|
|
|
DEBUG_LOG(("MTP Info: added, requestId %1").arg(request->requestId));
|
|
|
|
sendAnything(msCanWait);
|
|
}
|
|
|
|
QReadWriteLock *Session::keyMutex() const {
|
|
return dc->keyMutex();
|
|
}
|
|
|
|
void Session::authKeyCreatedForDC() {
|
|
DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, emitting authKeyCreated(), dcWithShift %1").arg(dcWithShift));
|
|
data.setKey(dc->getKey());
|
|
emit authKeyCreated();
|
|
}
|
|
|
|
void Session::notifyKeyCreated(AuthKeyPtr &&key) {
|
|
DEBUG_LOG(("AuthKey Info: Session::keyCreated(), setting, dcWithShift %1").arg(dcWithShift));
|
|
dc->setKey(std::move(key));
|
|
}
|
|
|
|
void Session::layerWasInitedForDC(bool wasInited) {
|
|
DEBUG_LOG(("MTP Info: Session::layerWasInitedForDC slot, dcWithShift %1").arg(dcWithShift));
|
|
data.setLayerWasInited(wasInited);
|
|
}
|
|
|
|
void Session::notifyLayerInited(bool wasInited) {
|
|
DEBUG_LOG(("MTP Info: emitting MTProtoDC::layerWasInited(%1), dcWithShift %2").arg(Logs::b(wasInited)).arg(dcWithShift));
|
|
dc->setConnectionInited(wasInited);
|
|
emit dc->layerWasInited(wasInited);
|
|
}
|
|
|
|
void Session::destroyKey() {
|
|
if (!dc) return;
|
|
|
|
if (data.getKey()) {
|
|
DEBUG_LOG(("MTP Info: destroying auth_key for dcWithShift %1").arg(dcWithShift));
|
|
if (data.getKey() == dc->getKey()) {
|
|
dc->destroyKey();
|
|
}
|
|
data.setKey(AuthKeyPtr());
|
|
}
|
|
}
|
|
|
|
int32 Session::getDcWithShift() const {
|
|
return dcWithShift;
|
|
}
|
|
|
|
void Session::tryToReceive() {
|
|
if (_killed) {
|
|
DEBUG_LOG(("Session Error: can't receive in a killed session"));
|
|
return;
|
|
}
|
|
if (paused()) {
|
|
_needToReceive = true;
|
|
return;
|
|
}
|
|
int32 cnt = 0;
|
|
while (true) {
|
|
mtpRequestId requestId;
|
|
mtpResponse response;
|
|
{
|
|
QWriteLocker locker(data.haveReceivedMutex());
|
|
mtpResponseMap &responses(data.haveReceivedMap());
|
|
mtpResponseMap::iterator i = responses.begin();
|
|
if (i == responses.end()) return;
|
|
|
|
requestId = i.key();
|
|
response = i.value();
|
|
responses.erase(i);
|
|
}
|
|
if (requestId <= 0) {
|
|
if (dcWithShift == bareDcId(dcWithShift)) { // call globalCallback only in main session
|
|
_instance->globalCallback(response.constData(), response.constData() + response.size());
|
|
}
|
|
} else {
|
|
_instance->execCallback(requestId, response.constData(), response.constData() + response.size());
|
|
}
|
|
++cnt;
|
|
}
|
|
}
|
|
|
|
Session::~Session() {
|
|
t_assert(_connection == nullptr);
|
|
}
|
|
|
|
MTPrpcError rpcClientError(const QString &type, const QString &description) {
|
|
return MTP_rpc_error(MTP_int(0), MTP_string(("CLIENT_" + type + (description.length() ? (": " + description) : "")).toUtf8().constData()));
|
|
}
|
|
|
|
} // namespace internal
|
|
} // namespace MTP
|