reactive counts test

This commit is contained in:
Theodor Diaconu 2017-11-25 14:45:31 +02:00
parent 83bf0460d7
commit 1723605f09
17 changed files with 463 additions and 14 deletions

View file

@ -1,3 +1,4 @@
import genCountEndpoint from '../query/counts/genEndpoint.server.js';
import createGraph from '../query/lib/createGraph.js'; import createGraph from '../query/lib/createGraph.js';
import recursiveCompose from '../query/lib/recursiveCompose.js'; import recursiveCompose from '../query/lib/recursiveCompose.js';
import hypernova from '../query/hypernova/hypernova.js'; import hypernova from '../query/hypernova/hypernova.js';
@ -52,6 +53,7 @@ export default class Exposure {
} }
this.initCountMethod(); this.initCountMethod();
this.initCountPublication();
} }
_validateAndClean() { _validateAndClean() {
@ -171,7 +173,7 @@ export default class Exposure {
} }
/** /**
* Initializez the method to retrieve the count of the data via Meteor.call * Initializes the method to retrieve the count of the data via Meteor.call
* @returns {*} * @returns {*}
*/ */
initCountMethod() { initCountMethod() {
@ -186,6 +188,25 @@ export default class Exposure {
}) })
} }
/**
* Initializes the reactive endpoint to retrieve the count of the data.
*/
initCountPublication() {
const collection = this.collection;
genCountEndpoint(this.name, {
getCursor(session) {
return collection.find(session.filters, {
fields: {_id: 1},
}, this.userId);
},
getSession(body) {
return { filters: body.$filters || {} };
},
});
}
/** /**
* Initializes security enforcement * Initializes security enforcement
* THINK: Maybe instead of overriding .find, I could store this data of security inside the collection object. * THINK: Maybe instead of overriding .find, I could store this data of security inside the collection object.

View file

@ -0,0 +1,2 @@
export const COUNTS_COLLECTION_CLIENT = '$grapher.counts';
export const COUNTS_COLLECTION_SERVER = '$grapher.counts_requests';

View file

@ -1,3 +1,5 @@
import { check } from 'meteor/check';
import NamedQuery from '../namedQuery.js'; import NamedQuery from '../namedQuery.js';
import ExposeSchema from './schema.js'; import ExposeSchema from './schema.js';
import mergeDeep from './lib/mergeDeep.js'; import mergeDeep from './lib/mergeDeep.js';
@ -5,6 +7,7 @@ import createGraph from '../../query/lib/createGraph.js';
import recursiveCompose from '../../query/lib/recursiveCompose.js'; import recursiveCompose from '../../query/lib/recursiveCompose.js';
import prepareForProcess from '../../query/lib/prepareForProcess.js'; import prepareForProcess from '../../query/lib/prepareForProcess.js';
import deepClone from 'lodash.cloneDeep'; import deepClone from 'lodash.cloneDeep';
import genCountEndpoint from '../../query/counts/genEndpoint.server';
_.extend(NamedQuery.prototype, { _.extend(NamedQuery.prototype, {
expose(config = {}) { expose(config = {}) {
@ -19,6 +22,8 @@ _.extend(NamedQuery.prototype, {
ExposeSchema.clean(config); ExposeSchema.clean(config);
this.exposeConfig = config; this.exposeConfig = config;
this._paramSchema = new SimpleSchema(this.exposeConfig.schema);
if (config.method) { if (config.method) {
this._initMethod(); this._initMethod();
} }
@ -32,6 +37,7 @@ _.extend(NamedQuery.prototype, {
} }
this._initCountMethod(); this._initCountMethod();
this._initCountPublication();
if (config.embody) { if (config.embody) {
this.body = mergeDeep( this.body = mergeDeep(
@ -74,7 +80,27 @@ _.extend(NamedQuery.prototype, {
return self.clone(newParams).getCount(); return self.clone(newParams).getCount();
} }
}) });
},
_initCountPublication() {
const self = this;
genCountEndpoint(self.name, {
getCursor(session) {
const query = self.clone(session.params);
return query.getCursorForCounting();
},
getSession(newParams) {
self._validateParams(newParams);
if (self.exposeConfig.firewall) {
self.exposeConfig.firewall.call(this, this.userId, newParams);
}
return { params: newParams };
},
});
}, },
_initPublication() { _initPublication() {
@ -100,13 +126,13 @@ _.extend(NamedQuery.prototype, {
if (params && this.exposeConfig.schema) { if (params && this.exposeConfig.schema) {
if (process.env.NODE_ENV !== 'production') { if (process.env.NODE_ENV !== 'production') {
try { try {
(new SimpleSchema(this.exposeConfig.schema)).validate(params); this._paramSchema.validate(params);
} catch (validationError) { } catch (validationError) {
console.error(`Invalid parameters supplied to query ${this.queryName}`, validationError); console.error(`Invalid parameters supplied to query ${this.queryName}`, validationError);
throw validationError; // rethrow throw validationError; // rethrow
} }
} else { } else {
(new SimpleSchema(this.exposeConfig.schema)).validate(params); this._paramSchema.validate(params);
} }
} }
} }

View file

@ -1,3 +1,4 @@
import CountSubscription from '../query/counts/countSubscription';
import createGraph from '../query/lib/createGraph.js'; import createGraph from '../query/lib/createGraph.js';
import recursiveFetch from '../query/lib/recursiveFetch.js'; import recursiveFetch from '../query/lib/recursiveFetch.js';
import prepareForProcess from '../query/lib/prepareForProcess.js'; import prepareForProcess from '../query/lib/prepareForProcess.js';
@ -22,6 +23,20 @@ export default class extends Base {
return this.subscriptionHandle; return this.subscriptionHandle;
} }
/**
* Subscribe to the counts for this query
*
* @param callback
* @returns {Object}
*/
subscribeCount(callback) {
if (!this._counter) {
this._counter = new CountSubscription(this);
}
return this._counter.subscribe(this.params, callback);
}
/** /**
* Unsubscribe if an existing subscription exists * Unsubscribe if an existing subscription exists
*/ */
@ -33,6 +48,16 @@ export default class extends Base {
this.subscriptionHandle = null; this.subscriptionHandle = null;
} }
/**
* Unsubscribe to the counts if a subscription exists.
*/
unsubscribeCount() {
if (this._counter) {
this._counter.unsubscribe();
this._counter = null;
}
}
/** /**
* Fetches elements in sync using promises * Fetches elements in sync using promises
* @return {*} * @return {*}
@ -90,6 +115,10 @@ export default class extends Base {
* @returns {any} * @returns {any}
*/ */
async getCountSync() { async getCountSync() {
if (this._counter) {
throw new Meteor.Error('This query is reactive, meaning you cannot use promises to fetch the data.');
}
return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params)); return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params));
} }
@ -99,11 +128,15 @@ export default class extends Base {
* @returns {any} * @returns {any}
*/ */
getCount(callback) { getCount(callback) {
if (!callback) { if (this._counter) {
throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count.'); return this._counter.getCount();
} else {
if (!callback) {
throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count or subscribe first.');
} else {
return Meteor.call(this.name + '.count', this.params, callback);
}
} }
return Meteor.call(this.name + '.count', this.params, callback);
} }
/** /**

View file

@ -30,7 +30,16 @@ export default class extends Base {
* @returns {any} * @returns {any}
*/ */
getCount() { getCount() {
return this.getCursorForCounting().count();
}
/**
* Returns the cursor for counting
* This is most likely used for counts cursor
*/
getCursorForCounting() {
let body = prepareForProcess(this.body, this.params); let body = prepareForProcess(this.body, this.params);
return this.collection.find(body.$filters || {}, {}).count();
return this.collection.find(body.$filters || {}, {fields: {_id: 1}});
} }
} }

View file

@ -49,6 +49,22 @@ describe('Named Query', function () {
}) })
}); });
it('Should work with reactive counts', function (done) {
const query = postListExposure.clone({title: 'User Post - 3'});
const handle = query.subscribeCount();
Tracker.autorun(c => {
if (handle.ready()) {
c.stop();
const count = query.getCount();
handle.stop();
assert.equal(count, 6);
done();
}
});
});
it('Should work with reactive queries', function (done) { it('Should work with reactive queries', function (done) {
const query = createQuery({ const query = createQuery({
postListExposure: { postListExposure: {

View file

@ -0,0 +1,7 @@
import { Mongo } from 'meteor/mongo';
import { COUNTS_COLLECTION_CLIENT } from './constants';
/**
* Internal collection used to store counts on the client.
*/
export default new Mongo.Collection(COUNTS_COLLECTION_CLIENT);

View file

@ -0,0 +1 @@
export const COUNTS_COLLECTION_CLIENT = '$grapher.counts';

View file

@ -0,0 +1,109 @@
import { EJSON } from 'meteor/ejson';
import { Meteor } from 'meteor/meteor';
import { ReactiveVar } from 'meteor/reactive-var';
import { Tracker } from 'meteor/tracker';
import Counts from './collection';
import createFauxSubscription from './createFauxSubscription';
import prepareForProcess from '../lib/prepareForProcess.js';
import NamedQueryBase from '../../namedQuery/namedQuery.base';
export default class CountSubscription {
/**
* @param {*} query - The query to use when fetching counts
*/
constructor(query) {
this.accessToken = new ReactiveVar(null);
this.fauxHandle = null;
this.query = query;
}
/**
* Starts a subscription request for reactive counts.
*
* @param {*} arg - The argument to pass to {name}.count.subscribe
* @param {*} callback
*/
subscribe(arg, callback) {
// Don't try to resubscribe if arg hasn't changed
if (EJSON.equals(this.lastArgs, arg) && this.fauxHandle) {
return this.fauxHandle;
}
this.accessToken.set(null);
this.lastArgs = arg;
Meteor.call(this.query.name + '.count.subscribe', arg, (error, token) => {
if (!this._markedForUnsubscribe) {
this.subscriptionHandle = Meteor.subscribe(this.query.name + '.count', token, callback);
this.accessToken.set(token);
this.disconnectComputation = Tracker.autorun(() => this.handleDisconnect());
}
this._markedForUnsubscribe = false;
});
this.fauxHandle = createFauxSubscription(this);
return this.fauxHandle;
}
/**
* Unsubscribes from the count endpoint, if there is such a subscription.
*/
unsubscribe() {
if (this.subscriptionHandle) {
this.disconnectComputation.stop();
this.subscriptionHandle.stop();
} else {
// If we hit this branch, then Meteor.call in subscribe hasn't finished yet
// so set a flag to stop the subscription from being created
this._markedForUnsubscribe = true;
}
this.accessToken.set(null);
this.fauxHandle = null;
this.subscriptionHandle = null;
}
/**
* Reactively fetch current document count. Returns null if the subscription is not ready yet.
*
* @returns {Number|null} - Current document count
*/
getCount() {
const id = this.accessToken.get();
if (id === null) return null;
const doc = Counts.findOne(id);
return doc.count;
}
/**
* All session info gets deleted when the server goes down, so when the client attempts to
* optimistically resume the '.count' publication, the server will throw a 'no-request' error.
*
* This function prevents that by manually stopping and restarting the subscription when the
* connection to the server is lost.
*/
handleDisconnect() {
const status = Meteor.status();
if (!status.connected) {
this._markedForResume = true;
this.fauxHandle = null;
this.subscriptionHandle.stop();
}
if (status.connected && this._markedForResume) {
this._markedForResume = false;
this.subscribe(this.lastArgs);
}
}
/**
* Returns whether or not a subscription request has been made.
*/
isSubscribed() {
return this.accessToken.get() !== null;
}
}

View file

@ -0,0 +1,10 @@
/**
* This method creates a "fake" subscription handle so that users of CountSubscription#subscribe
* have an interface consistent with normal subscriptions.
*
* @param {CountSubscription} countManager
*/
export default (countManager) => ({
ready: () => countManager.accessToken.get() !== null && countManager.subscriptionHandle.ready(),
stop: () => countManager.unsubscribe(),
});

View file

@ -0,0 +1,70 @@
import { check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';
import { Mongo } from 'meteor/mongo';
import { COUNTS_COLLECTION_CLIENT } from './constants';
// XXX: Should this persist between server restarts?
const collection = new Mongo.Collection(null);
/**
* This method generates a reactive count endpoint (a method and publication) for a collection or named query.
*
* @param {String} name - Name of the query or collection
* @param {Function} getCursor - Takes in the user's session document as an argument, and turns that into a Mongo cursor.
* @param {Function} getSession - Takes the subscribe method's argument as its parameter. Should enforce any necessary security constraints. The return value of this function is stored in the session document.
*/
export default (name, { getCursor, getSession }) => {
Meteor.methods({
[name + '.count.subscribe'](paramsOrBody) {
const session = getSession.call(this, paramsOrBody);
const existingSession = collection.findOne({ ...session, userId: this.userId });
// Try to reuse sessions if the user subscribes multiple times with the same data
if (existingSession) {
return existingSession._id;
}
const token = collection.insert({
...session,
query: name,
userId: this.userId,
});
return token;
},
});
Meteor.publish(name + '.count', function(token) {
check(token, String);
const self = this;
const request = collection.findOne({ _id: token, userId: self.userId });
if (!request) {
throw new Error('no-request', `You must acquire a request token via the "${name}.count.subscribe" method first.`);
}
const cursor = getCursor.call(this, request);
// Start counting
let count = 0;
self.added(COUNTS_COLLECTION_CLIENT, token, { count });
const handle = cursor.observeChanges({
added(id) {
count++;
self.changed(COUNTS_COLLECTION_CLIENT, token, { count });
},
removed(id) {
count--;
self.changed(COUNTS_COLLECTION_CLIENT, token, { count });
},
});
self.onStop(() => {
handle.stop();
collection.remove(token);
});
self.ready();
});
};

View file

@ -0,0 +1,6 @@
import { Mongo } from 'meteor/mongo';
import { SimpleSchema } from 'meteor/aldeed:simple-schema';
const PostsCollection = new Mongo.Collection('counts_posts');
export default PostsCollection;

View file

@ -0,0 +1,10 @@
import { createQuery } from 'meteor/cultofcoders:grapher';
const query = createQuery('counts_posts_query', {
counts_posts: {
_id: 1,
text: 1,
},
});
export default query;

View file

@ -0,0 +1,65 @@
import { Tracker } from 'meteor/tracker';
import PostsCollection from './bootstrap/collection.test';
import NamedQuery from './bootstrap/namedQuery.test';
describe('Reactive count tests', function () {
it('Should fetch the initial count', function (done) {
const query = NamedQuery.clone();
const handle = query.subscribeCount();
Tracker.autorun(c => {
if (handle.ready()) {
c.stop();
const count = query.getCount();
handle.stop();
assert.equal(count, 3);
done();
}
});
});
// TODO: Can these tests fail if assert gets called too quickly?
it('Should update when a document is added', function (done) {
const query = NamedQuery.clone();
const handle = query.subscribeCount();
Tracker.autorun(c => {
if (handle.ready()) {
c.stop();
const count = query.getCount();
assert.equal(count, 3);
Meteor.call('addPost', 'text4', (error, newId) => {
const newCount = query.getCount();
assert.equal(newCount, 4);
Meteor.call('removePost', newId);
handle.stop();
done();
});
}
});
});
it('Should update when a document is removed', function (done) {
const query = NamedQuery.clone();
const handle = query.subscribeCount();
Tracker.autorun(c => {
if (handle.ready()) {
c.stop();
const count = query.getCount();
assert.equal(count, 3);
Meteor.call('removePost', 'removeid', (error) => {
const newCount = query.getCount();
assert.equal(newCount, 2);
handle.stop();
done();
});
}
});
});
});

View file

@ -0,0 +1,19 @@
import { Meteor } from 'meteor/meteor';
import PostsCollection from './bootstrap/collection.test';
import query from './bootstrap/namedQuery.test';
query.expose();
PostsCollection.remove({});
PostsCollection.insert({ text: 'text 1' });
PostsCollection.insert({ text: 'text 2' });
PostsCollection.insert({ _id: 'removeid', text: 'text 3' });
Meteor.methods({
addPost(text) {
return PostsCollection.insert({ text });
},
removePost(id) {
PostsCollection.remove({ _id: id });
},
});

View file

@ -1,4 +1,5 @@
import { _ } from 'meteor/underscore'; import { _ } from 'meteor/underscore';
import CountSubscription from './counts/countSubscription';
import createGraph from './lib/createGraph.js'; import createGraph from './lib/createGraph.js';
import recursiveFetch from './lib/recursiveFetch.js'; import recursiveFetch from './lib/recursiveFetch.js';
import prepareForProcess from './lib/prepareForProcess.js'; import prepareForProcess from './lib/prepareForProcess.js';
@ -22,6 +23,23 @@ export default class Query extends Base {
return this.subscriptionHandle; return this.subscriptionHandle;
} }
/**
* Subscribe to the counts for this query
*
* @param callback
* @returns {Object}
*/
subscribeCount(callback) {
if (!this._counter) {
this._counter = new CountSubscription(this);
}
return this._counter.subscribe(
prepareForProcess(this.body, this.params),
callback
);
}
/** /**
* Unsubscribe if an existing subscription exists * Unsubscribe if an existing subscription exists
*/ */
@ -33,6 +51,16 @@ export default class Query extends Base {
this.subscriptionHandle = null; this.subscriptionHandle = null;
} }
/**
* Unsubscribe to the counts if a subscription exists.
*/
unsubscribeCount() {
if (this._counter) {
this._counter.unsubscribe();
this._counter = null;
}
}
/** /**
* Fetches elements in sync using promises * Fetches elements in sync using promises
* @return {*} * @return {*}
@ -90,6 +118,10 @@ export default class Query extends Base {
* @returns {any} * @returns {any}
*/ */
async getCountSync() { async getCountSync() {
if (this._counter) {
throw new Meteor.Error('This query is reactive, meaning you cannot use promises to fetch the data.');
}
return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params)); return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params));
} }
@ -99,11 +131,19 @@ export default class Query extends Base {
* @returns {any} * @returns {any}
*/ */
getCount(callback) { getCount(callback) {
if (!callback) { if (this._counter) {
throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count.'); return this._counter.getCount();
} else {
if (!callback) {
throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count or subscribe first.');
} else {
return Meteor.call(
this.name + '.count',
prepareForProcess(this.body, this.params),
callback
);
}
} }
return Meteor.call(this.name + '.count', prepareForProcess(this.body, this.params), callback);
} }
/** /**

View file

@ -23,13 +23,14 @@ Package.onUse(function (api) {
'ecmascript', 'ecmascript',
'underscore', 'underscore',
'promise', 'promise',
'reactive-var',
'mongo',
'aldeed:simple-schema@1.5.3', 'aldeed:simple-schema@1.5.3',
'matb33:collection-hooks@0.8.4', 'matb33:collection-hooks@0.8.4',
'reywood:publish-composite@1.4.2', 'reywood:publish-composite@1.4.2',
'dburles:mongo-collection-instances@0.3.5', 'dburles:mongo-collection-instances@0.3.5',
'tmeasday:check-npm-versions@0.3.1', 'tmeasday:check-npm-versions@0.3.1',
'meteorhacks:aggregate@1.3.0', 'meteorhacks:aggregate@1.3.0',
'mongo'
]; ];
api.use(packages); api.use(packages);
@ -82,6 +83,10 @@ Package.onTest(function (api) {
api.addFiles('lib/namedQuery/testing/bootstrap/client.js', 'client'); api.addFiles('lib/namedQuery/testing/bootstrap/client.js', 'client');
api.addFiles('lib/namedQuery/testing/bootstrap/server.js', 'server'); api.addFiles('lib/namedQuery/testing/bootstrap/server.js', 'server');
// REACTIVE COUNTS
api.addFiles('lib/query/counts/testing/server.test.js', 'server');
api.addFiles('lib/query/counts/testing/client.test.js', 'client');
api.addFiles('lib/namedQuery/testing/server.test.js', 'server'); api.addFiles('lib/namedQuery/testing/server.test.js', 'server');
api.addFiles('lib/namedQuery/testing/client.test.js', 'client'); api.addFiles('lib/namedQuery/testing/client.test.js', 'client');
}); });