diff --git a/lib/exposure/exposure.js b/lib/exposure/exposure.js index 1123ee8..c96770b 100644 --- a/lib/exposure/exposure.js +++ b/lib/exposure/exposure.js @@ -1,3 +1,4 @@ +import genCountEndpoint from '../query/counts/genEndpoint.server.js'; import createGraph from '../query/lib/createGraph.js'; import recursiveCompose from '../query/lib/recursiveCompose.js'; import hypernova from '../query/hypernova/hypernova.js'; @@ -52,6 +53,7 @@ export default class Exposure { } this.initCountMethod(); + this.initCountPublication(); } _validateAndClean() { @@ -171,7 +173,7 @@ export default class Exposure { } /** - * Initializez the method to retrieve the count of the data via Meteor.call + * Initializes the method to retrieve the count of the data via Meteor.call * @returns {*} */ initCountMethod() { @@ -186,6 +188,25 @@ export default class Exposure { }) } + /** + * Initializes the reactive endpoint to retrieve the count of the data. + */ + initCountPublication() { + const collection = this.collection; + + genCountEndpoint(this.name, { + getCursor(session) { + return collection.find(session.filters, { + fields: {_id: 1}, + }, this.userId); + }, + + getSession(body) { + return { filters: body.$filters || {} }; + }, + }); + } + /** * Initializes security enforcement * THINK: Maybe instead of overriding .find, I could store this data of security inside the collection object. diff --git a/lib/namedQuery/constants.js b/lib/namedQuery/constants.js new file mode 100644 index 0000000..504dee3 --- /dev/null +++ b/lib/namedQuery/constants.js @@ -0,0 +1,2 @@ +export const COUNTS_COLLECTION_CLIENT = '$grapher.counts'; +export const COUNTS_COLLECTION_SERVER = '$grapher.counts_requests'; \ No newline at end of file diff --git a/lib/namedQuery/expose/extension.js b/lib/namedQuery/expose/extension.js index e5690ce..b512e1c 100644 --- a/lib/namedQuery/expose/extension.js +++ b/lib/namedQuery/expose/extension.js @@ -1,3 +1,5 @@ +import { check } from 'meteor/check'; + import NamedQuery from '../namedQuery.js'; import ExposeSchema from './schema.js'; import mergeDeep from './lib/mergeDeep.js'; @@ -5,6 +7,7 @@ import createGraph from '../../query/lib/createGraph.js'; import recursiveCompose from '../../query/lib/recursiveCompose.js'; import prepareForProcess from '../../query/lib/prepareForProcess.js'; import deepClone from 'lodash.cloneDeep'; +import genCountEndpoint from '../../query/counts/genEndpoint.server'; _.extend(NamedQuery.prototype, { expose(config = {}) { @@ -19,6 +22,8 @@ _.extend(NamedQuery.prototype, { ExposeSchema.clean(config); this.exposeConfig = config; + this._paramSchema = new SimpleSchema(this.exposeConfig.schema); + if (config.method) { this._initMethod(); } @@ -32,6 +37,7 @@ _.extend(NamedQuery.prototype, { } this._initCountMethod(); + this._initCountPublication(); if (config.embody) { this.body = mergeDeep( @@ -74,7 +80,27 @@ _.extend(NamedQuery.prototype, { return self.clone(newParams).getCount(); } - }) + }); + }, + + _initCountPublication() { + const self = this; + + genCountEndpoint(self.name, { + getCursor(session) { + const query = self.clone(session.params); + return query.getCursorForCounting(); + }, + + getSession(newParams) { + self._validateParams(newParams); + if (self.exposeConfig.firewall) { + self.exposeConfig.firewall.call(this, this.userId, newParams); + } + + return { params: newParams }; + }, + }); }, _initPublication() { @@ -100,13 +126,13 @@ _.extend(NamedQuery.prototype, { if (params && this.exposeConfig.schema) { if (process.env.NODE_ENV !== 'production') { try { - (new SimpleSchema(this.exposeConfig.schema)).validate(params); + this._paramSchema.validate(params); } catch (validationError) { console.error(`Invalid parameters supplied to query ${this.queryName}`, validationError); throw validationError; // rethrow } } else { - (new SimpleSchema(this.exposeConfig.schema)).validate(params); + this._paramSchema.validate(params); } } } diff --git a/lib/namedQuery/namedQuery.client.js b/lib/namedQuery/namedQuery.client.js index dfab9c8..9c341ff 100644 --- a/lib/namedQuery/namedQuery.client.js +++ b/lib/namedQuery/namedQuery.client.js @@ -1,3 +1,4 @@ +import CountSubscription from '../query/counts/countSubscription'; import createGraph from '../query/lib/createGraph.js'; import recursiveFetch from '../query/lib/recursiveFetch.js'; import prepareForProcess from '../query/lib/prepareForProcess.js'; @@ -22,6 +23,20 @@ export default class extends Base { return this.subscriptionHandle; } + /** + * Subscribe to the counts for this query + * + * @param callback + * @returns {Object} + */ + subscribeCount(callback) { + if (!this._counter) { + this._counter = new CountSubscription(this); + } + + return this._counter.subscribe(this.params, callback); + } + /** * Unsubscribe if an existing subscription exists */ @@ -33,6 +48,16 @@ export default class extends Base { this.subscriptionHandle = null; } + /** + * Unsubscribe to the counts if a subscription exists. + */ + unsubscribeCount() { + if (this._counter) { + this._counter.unsubscribe(); + this._counter = null; + } + } + /** * Fetches elements in sync using promises * @return {*} @@ -90,6 +115,10 @@ export default class extends Base { * @returns {any} */ async getCountSync() { + if (this._counter) { + throw new Meteor.Error('This query is reactive, meaning you cannot use promises to fetch the data.'); + } + return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params)); } @@ -99,11 +128,15 @@ export default class extends Base { * @returns {any} */ getCount(callback) { - if (!callback) { - throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count.'); + if (this._counter) { + return this._counter.getCount(); + } else { + if (!callback) { + throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count or subscribe first.'); + } else { + return Meteor.call(this.name + '.count', this.params, callback); + } } - - return Meteor.call(this.name + '.count', this.params, callback); } /** diff --git a/lib/namedQuery/namedQuery.server.js b/lib/namedQuery/namedQuery.server.js index cf4969b..26320a4 100644 --- a/lib/namedQuery/namedQuery.server.js +++ b/lib/namedQuery/namedQuery.server.js @@ -30,7 +30,16 @@ export default class extends Base { * @returns {any} */ getCount() { + return this.getCursorForCounting().count(); + } + + /** + * Returns the cursor for counting + * This is most likely used for counts cursor + */ + getCursorForCounting() { let body = prepareForProcess(this.body, this.params); - return this.collection.find(body.$filters || {}, {}).count(); + + return this.collection.find(body.$filters || {}, {fields: {_id: 1}}); } } \ No newline at end of file diff --git a/lib/namedQuery/testing/client.test.js b/lib/namedQuery/testing/client.test.js index f8ea0e5..2eef850 100644 --- a/lib/namedQuery/testing/client.test.js +++ b/lib/namedQuery/testing/client.test.js @@ -49,6 +49,22 @@ describe('Named Query', function () { }) }); + it('Should work with reactive counts', function (done) { + const query = postListExposure.clone({title: 'User Post - 3'}); + + const handle = query.subscribeCount(); + Tracker.autorun(c => { + if (handle.ready()) { + c.stop(); + const count = query.getCount(); + handle.stop(); + + assert.equal(count, 6); + done(); + } + }); + }); + it('Should work with reactive queries', function (done) { const query = createQuery({ postListExposure: { diff --git a/lib/query/counts/collection.js b/lib/query/counts/collection.js new file mode 100644 index 0000000..44c33ed --- /dev/null +++ b/lib/query/counts/collection.js @@ -0,0 +1,7 @@ +import { Mongo } from 'meteor/mongo'; +import { COUNTS_COLLECTION_CLIENT } from './constants'; + +/** + * Internal collection used to store counts on the client. + */ +export default new Mongo.Collection(COUNTS_COLLECTION_CLIENT); diff --git a/lib/query/counts/constants.js b/lib/query/counts/constants.js new file mode 100644 index 0000000..01b6f49 --- /dev/null +++ b/lib/query/counts/constants.js @@ -0,0 +1 @@ +export const COUNTS_COLLECTION_CLIENT = '$grapher.counts'; diff --git a/lib/query/counts/countSubscription.js b/lib/query/counts/countSubscription.js new file mode 100644 index 0000000..6fb543b --- /dev/null +++ b/lib/query/counts/countSubscription.js @@ -0,0 +1,109 @@ +import { EJSON } from 'meteor/ejson'; +import { Meteor } from 'meteor/meteor'; +import { ReactiveVar } from 'meteor/reactive-var'; +import { Tracker } from 'meteor/tracker'; + +import Counts from './collection'; +import createFauxSubscription from './createFauxSubscription'; +import prepareForProcess from '../lib/prepareForProcess.js'; +import NamedQueryBase from '../../namedQuery/namedQuery.base'; + +export default class CountSubscription { + /** + * @param {*} query - The query to use when fetching counts + */ + constructor(query) { + this.accessToken = new ReactiveVar(null); + this.fauxHandle = null; + this.query = query; + } + + /** + * Starts a subscription request for reactive counts. + * + * @param {*} arg - The argument to pass to {name}.count.subscribe + * @param {*} callback + */ + subscribe(arg, callback) { + // Don't try to resubscribe if arg hasn't changed + if (EJSON.equals(this.lastArgs, arg) && this.fauxHandle) { + return this.fauxHandle; + } + + this.accessToken.set(null); + this.lastArgs = arg; + + Meteor.call(this.query.name + '.count.subscribe', arg, (error, token) => { + if (!this._markedForUnsubscribe) { + this.subscriptionHandle = Meteor.subscribe(this.query.name + '.count', token, callback); + this.accessToken.set(token); + + this.disconnectComputation = Tracker.autorun(() => this.handleDisconnect()); + } + + this._markedForUnsubscribe = false; + }); + + this.fauxHandle = createFauxSubscription(this); + return this.fauxHandle; + } + + /** + * Unsubscribes from the count endpoint, if there is such a subscription. + */ + unsubscribe() { + if (this.subscriptionHandle) { + this.disconnectComputation.stop(); + this.subscriptionHandle.stop(); + } else { + // If we hit this branch, then Meteor.call in subscribe hasn't finished yet + // so set a flag to stop the subscription from being created + this._markedForUnsubscribe = true; + } + + this.accessToken.set(null); + this.fauxHandle = null; + this.subscriptionHandle = null; + } + + /** + * Reactively fetch current document count. Returns null if the subscription is not ready yet. + * + * @returns {Number|null} - Current document count + */ + getCount() { + const id = this.accessToken.get(); + if (id === null) return null; + + const doc = Counts.findOne(id); + return doc.count; + } + + /** + * All session info gets deleted when the server goes down, so when the client attempts to + * optimistically resume the '.count' publication, the server will throw a 'no-request' error. + * + * This function prevents that by manually stopping and restarting the subscription when the + * connection to the server is lost. + */ + handleDisconnect() { + const status = Meteor.status(); + if (!status.connected) { + this._markedForResume = true; + this.fauxHandle = null; + this.subscriptionHandle.stop(); + } + + if (status.connected && this._markedForResume) { + this._markedForResume = false; + this.subscribe(this.lastArgs); + } + } + + /** + * Returns whether or not a subscription request has been made. + */ + isSubscribed() { + return this.accessToken.get() !== null; + } +} diff --git a/lib/query/counts/createFauxSubscription.js b/lib/query/counts/createFauxSubscription.js new file mode 100644 index 0000000..f09c98f --- /dev/null +++ b/lib/query/counts/createFauxSubscription.js @@ -0,0 +1,10 @@ +/** + * This method creates a "fake" subscription handle so that users of CountSubscription#subscribe + * have an interface consistent with normal subscriptions. + * + * @param {CountSubscription} countManager + */ +export default (countManager) => ({ + ready: () => countManager.accessToken.get() !== null && countManager.subscriptionHandle.ready(), + stop: () => countManager.unsubscribe(), +}); diff --git a/lib/query/counts/genEndpoint.server.js b/lib/query/counts/genEndpoint.server.js new file mode 100644 index 0000000..bb22903 --- /dev/null +++ b/lib/query/counts/genEndpoint.server.js @@ -0,0 +1,70 @@ +import { check } from 'meteor/check'; +import { Meteor } from 'meteor/meteor'; +import { Mongo } from 'meteor/mongo'; + +import { COUNTS_COLLECTION_CLIENT } from './constants'; + +// XXX: Should this persist between server restarts? +const collection = new Mongo.Collection(null); + +/** + * This method generates a reactive count endpoint (a method and publication) for a collection or named query. + * + * @param {String} name - Name of the query or collection + * @param {Function} getCursor - Takes in the user's session document as an argument, and turns that into a Mongo cursor. + * @param {Function} getSession - Takes the subscribe method's argument as its parameter. Should enforce any necessary security constraints. The return value of this function is stored in the session document. + */ +export default (name, { getCursor, getSession }) => { + Meteor.methods({ + [name + '.count.subscribe'](paramsOrBody) { + const session = getSession.call(this, paramsOrBody); + const existingSession = collection.findOne({ ...session, userId: this.userId }); + + // Try to reuse sessions if the user subscribes multiple times with the same data + if (existingSession) { + return existingSession._id; + } + + const token = collection.insert({ + ...session, + query: name, + userId: this.userId, + }); + + return token; + }, + }); + + Meteor.publish(name + '.count', function(token) { + check(token, String); + const self = this; + const request = collection.findOne({ _id: token, userId: self.userId }); + + if (!request) { + throw new Error('no-request', `You must acquire a request token via the "${name}.count.subscribe" method first.`); + } + + const cursor = getCursor.call(this, request); + + // Start counting + let count = 0; + self.added(COUNTS_COLLECTION_CLIENT, token, { count }); + const handle = cursor.observeChanges({ + added(id) { + count++; + self.changed(COUNTS_COLLECTION_CLIENT, token, { count }); + }, + + removed(id) { + count--; + self.changed(COUNTS_COLLECTION_CLIENT, token, { count }); + }, + }); + + self.onStop(() => { + handle.stop(); + collection.remove(token); + }); + self.ready(); + }); +}; diff --git a/lib/query/counts/testing/bootstrap/collection.test.js b/lib/query/counts/testing/bootstrap/collection.test.js new file mode 100644 index 0000000..c91467c --- /dev/null +++ b/lib/query/counts/testing/bootstrap/collection.test.js @@ -0,0 +1,6 @@ +import { Mongo } from 'meteor/mongo'; +import { SimpleSchema } from 'meteor/aldeed:simple-schema'; + +const PostsCollection = new Mongo.Collection('counts_posts'); + +export default PostsCollection; diff --git a/lib/query/counts/testing/bootstrap/namedQuery.test.js b/lib/query/counts/testing/bootstrap/namedQuery.test.js new file mode 100644 index 0000000..6506717 --- /dev/null +++ b/lib/query/counts/testing/bootstrap/namedQuery.test.js @@ -0,0 +1,10 @@ +import { createQuery } from 'meteor/cultofcoders:grapher'; + +const query = createQuery('counts_posts_query', { + counts_posts: { + _id: 1, + text: 1, + }, +}); + +export default query; diff --git a/lib/query/counts/testing/client.test.js b/lib/query/counts/testing/client.test.js new file mode 100644 index 0000000..d70d5ca --- /dev/null +++ b/lib/query/counts/testing/client.test.js @@ -0,0 +1,65 @@ +import { Tracker } from 'meteor/tracker'; +import PostsCollection from './bootstrap/collection.test'; +import NamedQuery from './bootstrap/namedQuery.test'; + +describe('Reactive count tests', function () { + it('Should fetch the initial count', function (done) { + const query = NamedQuery.clone(); + const handle = query.subscribeCount(); + + Tracker.autorun(c => { + if (handle.ready()) { + c.stop(); + const count = query.getCount(); + handle.stop(); + + assert.equal(count, 3); + done(); + } + }); + }); + + // TODO: Can these tests fail if assert gets called too quickly? + it('Should update when a document is added', function (done) { + const query = NamedQuery.clone(); + const handle = query.subscribeCount(); + + Tracker.autorun(c => { + if (handle.ready()) { + c.stop(); + const count = query.getCount(); + assert.equal(count, 3); + + Meteor.call('addPost', 'text4', (error, newId) => { + const newCount = query.getCount(); + assert.equal(newCount, 4); + + Meteor.call('removePost', newId); + handle.stop(); + done(); + }); + } + }); + }); + + it('Should update when a document is removed', function (done) { + const query = NamedQuery.clone(); + const handle = query.subscribeCount(); + + Tracker.autorun(c => { + if (handle.ready()) { + c.stop(); + const count = query.getCount(); + assert.equal(count, 3); + + Meteor.call('removePost', 'removeid', (error) => { + const newCount = query.getCount(); + assert.equal(newCount, 2); + + handle.stop(); + done(); + }); + } + }); + }); +}); diff --git a/lib/query/counts/testing/server.test.js b/lib/query/counts/testing/server.test.js new file mode 100644 index 0000000..a8cdf69 --- /dev/null +++ b/lib/query/counts/testing/server.test.js @@ -0,0 +1,19 @@ +import { Meteor } from 'meteor/meteor'; +import PostsCollection from './bootstrap/collection.test'; +import query from './bootstrap/namedQuery.test'; + +query.expose(); +PostsCollection.remove({}); +PostsCollection.insert({ text: 'text 1' }); +PostsCollection.insert({ text: 'text 2' }); +PostsCollection.insert({ _id: 'removeid', text: 'text 3' }); + +Meteor.methods({ + addPost(text) { + return PostsCollection.insert({ text }); + }, + + removePost(id) { + PostsCollection.remove({ _id: id }); + }, +}); diff --git a/lib/query/query.client.js b/lib/query/query.client.js index ec88dd1..035261a 100644 --- a/lib/query/query.client.js +++ b/lib/query/query.client.js @@ -1,4 +1,5 @@ import { _ } from 'meteor/underscore'; +import CountSubscription from './counts/countSubscription'; import createGraph from './lib/createGraph.js'; import recursiveFetch from './lib/recursiveFetch.js'; import prepareForProcess from './lib/prepareForProcess.js'; @@ -22,6 +23,23 @@ export default class Query extends Base { return this.subscriptionHandle; } + /** + * Subscribe to the counts for this query + * + * @param callback + * @returns {Object} + */ + subscribeCount(callback) { + if (!this._counter) { + this._counter = new CountSubscription(this); + } + + return this._counter.subscribe( + prepareForProcess(this.body, this.params), + callback + ); + } + /** * Unsubscribe if an existing subscription exists */ @@ -33,6 +51,16 @@ export default class Query extends Base { this.subscriptionHandle = null; } + /** + * Unsubscribe to the counts if a subscription exists. + */ + unsubscribeCount() { + if (this._counter) { + this._counter.unsubscribe(); + this._counter = null; + } + } + /** * Fetches elements in sync using promises * @return {*} @@ -90,6 +118,10 @@ export default class Query extends Base { * @returns {any} */ async getCountSync() { + if (this._counter) { + throw new Meteor.Error('This query is reactive, meaning you cannot use promises to fetch the data.'); + } + return await callWithPromise(this.name + '.count', prepareForProcess(this.body, this.params)); } @@ -99,11 +131,19 @@ export default class Query extends Base { * @returns {any} */ getCount(callback) { - if (!callback) { - throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count.'); + if (this._counter) { + return this._counter.getCount(); + } else { + if (!callback) { + throw new Meteor.Error('not-allowed', 'You are on client so you must either provide a callback to get the count or subscribe first.'); + } else { + return Meteor.call( + this.name + '.count', + prepareForProcess(this.body, this.params), + callback + ); + } } - - return Meteor.call(this.name + '.count', prepareForProcess(this.body, this.params), callback); } /** diff --git a/package.js b/package.js index 7656579..ca1d1cd 100644 --- a/package.js +++ b/package.js @@ -23,13 +23,14 @@ Package.onUse(function (api) { 'ecmascript', 'underscore', 'promise', + 'reactive-var', + 'mongo', 'aldeed:simple-schema@1.5.3', 'matb33:collection-hooks@0.8.4', 'reywood:publish-composite@1.4.2', 'dburles:mongo-collection-instances@0.3.5', 'tmeasday:check-npm-versions@0.3.1', 'meteorhacks:aggregate@1.3.0', - 'mongo' ]; api.use(packages); @@ -82,6 +83,10 @@ Package.onTest(function (api) { api.addFiles('lib/namedQuery/testing/bootstrap/client.js', 'client'); api.addFiles('lib/namedQuery/testing/bootstrap/server.js', 'server'); + // REACTIVE COUNTS + api.addFiles('lib/query/counts/testing/server.test.js', 'server'); + api.addFiles('lib/query/counts/testing/client.test.js', 'client'); + api.addFiles('lib/namedQuery/testing/server.test.js', 'server'); api.addFiles('lib/namedQuery/testing/client.test.js', 'client'); });