Child pages
  • Grouper loader real time updates
Skip to end of metadata
Go to start of metadata

This feature allows you to have incremental loading of memberships for SQL and LDAP based loader jobs.  For SQL jobs, you can add database triggers onto your existing loader tables to populate a new table to indicate which users have been updated for which jobs.  And for both SQL and LDAP jobs, you can add messages to a message queue and have Grouper look for changes there.  Then the incremental loader can process updates quickly just for those users.

If you use this with SQL_GROUP_LIST, the loader job must have grouperLoaderGroupsLike configured.  Also, for SQL_GROUP_LIST, if a new group doesn't exist in Grouper yet, the incremental loader will trigger a full sync.

Support for LDAP jobs is minimal right now.  If a message is added to the message queue for an LDAP job, a full sync is simply performed.  If you plan to use this feature and would like to see this improved, please contact the Grouper Team.

Configuration

By default, multiple threads will be used to improve performance.  This is the default configuration but can be adjusted in grouper-loader.properties.

loader.incrementalThreads=true
loader.incrementalThreadPoolSize=10


In grouper-loader.properties, configure your incremental jobs.  You can have multiple jobs, which would mainly be relevant if you use multiple databases for your loader jobs.

otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0 * * * * ?
otherJob.incrementalLoader1.databaseName=warehouse
otherJob.incrementalLoader1.tableName=myincrementaltable
 
otherJob.incrementalLoader2.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader2.quartzCron = 0 * * * * ?
otherJob.incrementalLoader2.databaseName=warehouse2
otherJob.incrementalLoader2.tableName=myincrementaltable


Database setup

Your incremental table should have the following columns:

  • id - primary key (e.g. auto incremental or a sequence)
  • subject_id - mutually exclusive with subject_identifier/subject_id_or_identifier
  • subject_identifier - mutually exclusive with subject_id/subject_id_or_identifier
  • subject_id_or_identifier - mutually exclusive with subject_id/subject_identifier
  • source_id - optional
  • loader_group_name - Grouper group name of the loader group that stores the definition.
  • timestamp - milliseconds.  Your trigger should populate this.
  • completed_timestamp - milliseconds.  The incremental loader will populate this when the row is processed.  After one day, the row will be deleted.
HSQLDB:
 
CREATE TABLE myincrementaltable
(
    id INTEGER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp BIGINT NOT NULL,
    completed_timestamp BIGINT,
    PRIMARY KEY (id)
);

Messaging

If you are triggering updates via messaging instead of database triggers, the table above is still needed.  A builtin message listener will simply monitor your queue and add rows to the table.  Note that the "id" column has to be populated automatically on insert (i.e. use an auto increment type column or use a trigger to populate based on a sequence before the insert).

In your grouper-loader.properties file, add the following configuration for a new message listener:

messaging.listener.myCustomMessagingListener.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalMessagingListener
messaging.listener.myCustomMessagingListener.quartzCron = 0 * * * * ?
messaging.listener.myCustomMessagingListener.messagingSystemName = grouperBuiltinMessaging
messaging.listener.myCustomMessagingListener.queueName = abc
messaging.listener.myCustomMessagingListener.numberOfTriesPerIteration = 3
messaging.listener.myCustomMessagingListener.pollingTimeoutSeconds = 18
messaging.listener.myCustomMessagingListener.sleepSecondsInBetweenIterations = 0
messaging.listener.myCustomMessagingListener.maxMessagesToReceiveAtOnce = 20
# if there are 20 messages to receive at once, then do this 50 times per call max
messaging.listener.myCustomMessagingListener.maxOuterLoops = 50
messaging.listener.myCustomMessagingListener.incrementalLoaderJobName = incrementalLoader1
  • Update the messagingSystemName to point to your messaging system (Grouper supports a built in messaging system along with RabbitMQ, AWS, etc).
  • Update the queueName
  • Update incrementalLoaderJobName based on what was configured earlier in the Configuration section above.

Format of messages:

{'subjectId':'test.subject.0', 'loaderGroupName':'test:owner', 'sourceId':'jdbc'}
  • Must have subjectId, subjectIdentifier or subjectIdOrIdentifier.
  • Must have loaderGroupName
  • sourceId is optional

Fail safe

If there are more than a certain number of changes for a loader job, the incremental loader will avoid processing those changes and instead immediately trigger a full sync.  This helps with performance but also outsources fail safe to the full sync, which has its own configuration options.  The number by default is 100 but can be adjusted in grouper-loader.properties:

otherJob.incrementalLoader1.fullSyncThreshold=100



Examples

Example of SQL_SIMPLE using Oracle (loader table has a group name field to allow multiple SQL_SIMPLE jobs):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id NUMBER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp NUMBER NOT NULL,
    completed_timestamp NUMBER,
    PRIMARY KEY (id)
);
 
And a sequence for the primary key on the incremental table:
 
CREATE SEQUENCE myincrementaltable_seq;
 
And the following loader job:
addRootStem("test", "test")
addGroup("test", "loader1", "loader1")
groupAddType("test:loader1", "grouperLoader")
setGroupAttr("test:loader1", "grouperLoaderDbName", "grouper")
setGroupAttr("test:loader1", "grouperLoaderType", "SQL_SIMPLE")
setGroupAttr("test:loader1", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:loader1", "grouperLoaderQuery", "select subject_id from myloadertable")
setGroupAttr("test:loader1", "grouperLoaderIntervalSeconds", "86400")
 
Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:
 
CREATE OR REPLACE TRIGGER mytrigger
AFTER INSERT OR DELETE OR UPDATE ON myloadertable
FOR EACH ROW
DECLARE
  timemillis NUMBER;
BEGIN
  select extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 
     + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3')) into timemillis from dual;
  IF (:new.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :new.subject_id, :new.group_name, timemillis);
  END IF;
  IF (:old.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :old.subject_id, :old.group_name, timemillis);
   END IF;
END;


Example of SQL_GROUP_LIST using Oracle (assumes a single loader job):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id NUMBER NOT NULL,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp NUMBER NOT NULL,
    completed_timestamp NUMBER,
    PRIMARY KEY (id)
);
 
And a sequence for the primary key on the incremental table:
 
CREATE SEQUENCE myincrementaltable_seq;
 
And the following loader job:
addRootStem("test", "test")
addGroup("test", "owner", "owner")
groupAddType("test:owner", "grouperLoader")
setGroupAttr("test:owner", "grouperLoaderDbName", "grouper")
setGroupAttr("test:owner", "grouperLoaderType", "SQL_GROUP_LIST")
setGroupAttr("test:owner", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:owner", "grouperLoaderQuery", "select group_name, subject_id from myloadertable")
setGroupAttr("test:owner", "grouperLoaderIntervalSeconds", "86400")
setGroupAttr("test:owner", "grouperLoaderGroupsLike", "test:loader%")


Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:
 
CREATE OR REPLACE TRIGGER mytrigger
AFTER INSERT OR DELETE OR UPDATE ON myloadertable
FOR EACH ROW
DECLARE
  timemillis NUMBER;
BEGIN
  select extract(day from(sys_extract_utc(systimestamp) - to_timestamp('1970-01-01', 'YYYY-MM-DD'))) * 86400000 
     + to_number(to_char(sys_extract_utc(systimestamp), 'SSSSSFF3')) into timemillis from dual;
  IF (:new.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :new.subject_id, 'test:owner', timemillis);
  END IF;
  IF (:old.subject_id is not null) THEN
    INSERT INTO myincrementaltable (id, subject_id, loader_group_name, timestamp) values (myincrementaltable_seq.nextval, :old.subject_id, 'test:owner', timemillis);
   END IF;
END;


Example of SQL_SIMPLE using MySQL (loader table has a group name field to allow multiple SQL_SIMPLE jobs):
Say you have a loader table that looks like the following:
 
CREATE TABLE myloadertable
(
    subject_id VARCHAR(255),
    group_name VARCHAR(1024)
);
 
With the following incremental table:
 
CREATE TABLE myincrementaltable
(
    id BIGINT NOT NULL AUTO_INCREMENT,
    subject_id VARCHAR(255),
    subject_identifier VARCHAR(255),
    subject_id_or_identifier VARCHAR(255),
    source_id VARCHAR(255),
    loader_group_name VARCHAR(1024) NOT NULL,
    timestamp BIGINT NOT NULL,
    completed_timestamp BIGINT,
    PRIMARY KEY (id)
);
  
And the following loader job:
addRootStem("test", "test")
addGroup("test", "loader1", "loader1")
groupAddType("test:loader1", "grouperLoader")
setGroupAttr("test:loader1", "grouperLoaderDbName", "grouper")
setGroupAttr("test:loader1", "grouperLoaderType", "SQL_SIMPLE")
setGroupAttr("test:loader1", "grouperLoaderScheduleType", "START_TO_START_INTERVAL")
setGroupAttr("test:loader1", "grouperLoaderQuery", "select subject_id from myloadertable")
setGroupAttr("test:loader1", "grouperLoaderIntervalSeconds", "86400")
 
Assuming this is on the same database as Grouper, you could add the following configuration in grouper-loader.properties (run every 5 seconds):
otherJob.incrementalLoader1.class = edu.internet2.middleware.grouper.app.loader.GrouperLoaderIncrementalJob
otherJob.incrementalLoader1.quartzCron = 0/5 * * * * ?
otherJob.incrementalLoader1.databaseName=grouper
otherJob.incrementalLoader1.tableName=myincrementaltable
 
And the following trigger:

delimiter |
CREATE TRIGGER mytrigger_insert
AFTER INSERT ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (NEW.subject_id, NEW.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;

|
CREATE TRIGGER mytrigger_update
AFTER UPDATE ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (NEW.subject_id, NEW.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (OLD.subject_id, OLD.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;
|
CREATE TRIGGER mytrigger_delete
AFTER DELETE ON myloadertable
FOR EACH ROW
BEGIN
  INSERT INTO myincrementaltable (subject_id, loader_group_name, timestamp) values (OLD.subject_id, OLD.group_name, ROUND(UNIX_TIMESTAMP(CURTIME(4)) * 1000));
END;
|
delimiter ;
 
 
 
 
 
 
 
 


Next steps / dev notes

  • Better support for LDAP loader jobs - currently if there's an update for an LDAP job, it will just run the full sync, which obviously isn't efficient.  Proposed updates:
    • The basic idea is to adjust the search filters when invoked via the real time loader to limit the results.
      • LDAP_SIMPLE - Have a config option that converts the subject id/identifier to the DN (grouperLoaderLdapSubjectReverseExpression).  Basically, this is the opposite of grouperLoaderLdapSubjectExpression.  Then the real-time loader can quickly see if the user is supposed to be in the group by adding the result of this expression to the search filter, e.g. (&(existingFilter)(subjectAttribute=result_of_expression))
      • LDAP_GROUP_LIST - Similar to above with the use of grouperLoaderLdapSubjectReverseExpression.  Also, the group name expression code should be moved to allow it to be reused by the real-time loader as well.
      • LDAP_GROUPS_FROM_ATTRIBUTES - Perhaps only support the real-time loader if grouperLoaderLdapSubjectAttribute is specified and grouperLoaderLdapSubjectExpression is not specified.  This would basically mean that the subject id/identifier would be an attribute of the user.  Otherwise, what is the use case for needing an expression here??
    • Have an option to sync all LDAP groups for a user since it may not be obvious to the deployer which LDAP group(s) would be impacted when a user is updated in LDAP.  Perhaps allow the loader_group_name column to be populated with something like "ALL_LDAP_GROUPS".
    • For SQL_GROUP_LIST, the real-time loader only works when grouperLoaderGroupsLike is specified.  This was needed so that it knew which groups were owned by the job and could adjust their memberships appropriately.  Now that there's loader metadata, perhaps SQL_GROUP_LIST and LDAP_GROUP_LIST can rely on that instead.
  • No labels