<?php
function islandora_workflow_client_menu()
{
$items['admin/settings/workflow_client'] = array(
'title' => 'Islandora Workflow Client',
'description' => 'Manage Islandora Workflows',
'page callback' => 'islandora_workflow_client_manage',
'access arguments' => array('administer site configuration'),
'type' => MENU_NORMAL_ITEM
);
return $items;
}
function islandora_workflow_client_search_submit($form,&$form_state)
{
$url ='admin/settings/workflow_client/'.$form['process_name']['#value'];
if (trim($form['process_id']['#value']) !== '') {
$url.='/'.$form['process_id']['#value'];
} else {
$url .= '/-/';
}
if (trim($form['collection_pid']['#value']) !== '') {
$url.='/'.$form['collection_pid']['#value'];
} else {
$url .= '/-/';
}
drupal_goto($url);
}
function islandora_workflow_client_search(&$form_state,$terms=null,$process_id=null,$collection=null)
{
module_load_include('inc', 'fedora_repository', 'api/fedora_utils');
$form = array();
$form['process_name'] = array(
'#type' => 'textfield',
'#required'=> TRUE,
'#title' => t('Search by Process Name'),
'#default_value' => ($terms!=null?$terms:''),
'#description' => t('Returns a list of objects that match the process name(s) entered. Separate multiple names by spaces.'),
);
$form['process_id'] = array(
'#type' => 'textfield',
'#title' => t('Search by Process ID'),
'#default_value' => ($process_id!=null?$process_id:''),
'#description' => t('Returns only objects that match the also match the process id entered. '),
);
$form['collection_pid'] = array(
'#type' => 'textfield',
'#title' => t('Search by Collection PID'),
'#default_value' => ($collection!=null?$collection:''),
'#description' => t('Returns only objects that match the also match the collection pid(s) entered. Separate multiple PIDs by spaces.'),
);
$form['submit'] = array('#type' => 'submit', '#value' => t('Search'));
return $form;
}
function islandora_workflow_client_manage($terms = null, $process_id= null, $collection = null, $queue= null, $queueProcess = null)
{
if ($collection == 'none' || $collection == '-') {
$collection = null;
}
if ($process_id == 'none' || $process_id == '-') {
$process_id = null;
}
$output = '';
if (trim($terms) != '')
{
module_load_include('inc', 'fedora_repository', 'api/fedora_item');
module_load_include('inc', 'islandora_workflow_client', 'workflow');
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FALSE)
{
drupal_set_message(t('Error: Islandora_solr_search module is required to search by process.'));
} else
{
$host = variable_get('islandora_solr_search_block_host','localhost');
$port = variable_get('islandora_solr_search_block_port','8080');
$appName = variable_get('islandora_solr_search_block_app_name','solr');
$solr = new Apache_Solr_Service($host, $port, '/'.$appName.'/');
try
{
if ($solr->ping())
{
$q = preg_split('/\s+/',$terms);
foreach ($q as $key=>$bit)
{
$q[$key]='workflow_process_t:'.htmlentities($bit);
}
$query = join(' OR ',$q);
if (trim($collection) != '')
{
$q= preg_split('/\s/',$collection);
foreach ($q as $key=>$bit)
{
$q[$key]='related_item_identifier_t:'.htmlentities(preg_replace('/\:/','/',$bit));
}
$query .= ' AND ('. join(' OR ',$q).')';
}
$results = $solr->search($query,0,100);
$pids=array();
$processes = array();
if ($results->response->numFound ==0 )
{
drupal_set_message(t('No processes found.'));
} else
{
foreach($results->response->docs as $doc)
{
$id = preg_replace('/\//',':',$doc->id);
$collection_pid = preg_replace('/\//',':',$doc->related_item_identifier_t);
$pids[]=$id;
if (!is_array($doc->workflow_process_t))
{
if (!is_array($processes[$doc->workflow_process_t]))
$processes[$doc->workflow_process_t]=array($id);
else
$processes[$doc->workflow_process_t][]=$id;
} else
{
foreach ($doc->workflow_process_t as $process)
{
if (!is_array($processes[$process]))
$processes[$process]=array($id);
else
$processes[$process][]=$id;
}
}
}
}
$workflows=array();
foreach ($pids as $pid)
{
$workflows[$pid]=Workflow::loadFromObject($pid);
}
if (count($processes) > 0)
{
$errors=array();
$headers = array('Process Name', '# Waiting to Run', '# Completed', '# Errors', 'Action');
$rows=array();
foreach ($processes as $name=>$pids)
{
$errCount = 0;
$waitCount =0;
$completeCount = 0;
$display = false;
foreach ($pids as $pid)
{
if ( isset($workflows[$pid]) && $workflows[$pid] !== false )
{
$display = true;
$procs = $workflows[$pid]->getProcesses();
$updated = FALSE;
foreach ($procs as $id=>$n)
{
if ($process_id == null || $id == $process_id)
{
if ($name == $n)
{
$proc=$workflows[$pid]->getProcess($id);
if (($queue == 'queue'|| ($queue =='errorQueue' && $proc['state'] == 'error')) && $queueProcess == $n)
{
$workflows[$pid]->setState($id,'waiting');
$updated=TRUE;
}
switch ($proc['state'])
{
case 'completed':
$completeCount++;
break;
case 'waiting':
$waitCount++;
break;
case 'error':
$errCount++;
$errors[]=$proc;
break;
}
}
}
if ($updated)
{
$workflows[$pid]->saveToFedora();
}
}
}
}
if ($display) {
$rows[]= array($name, $waitCount,$completeCount,$errCount,
l('Add All to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($process_id)==''?'none':$process_id).'/'.(trim($collection)==''?'none':$collection).'/queue/'.$name).'<br/>'.
l('Add Errors to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($process_id)==''?'none':$process_id).'/'.(trim($collection)==''?'none':$collection).'/errorQueue/'.$name));
}
}
if ($queue == 'queue' || $queue == 'errorQueue')
{
drupal_goto('admin/settings/workflow_client/'.$terms.(trim($collection)==''?'/'.$collection:''));
}
$output.='<h3>Search for '.($process_id!=null?'process id '.$process_id.' with terms ':'').' "'.$terms.'" '.(trim($collection)!=''?'in collection(s) "'.$collection.'" ':'').'returned Processes:</h3>';
$output.=theme('table',$headers,$rows);
if (count ($errors) > 0)
{
$output.='<h3>Found Errors</h3>';
foreach ($errors as $proc)
{
$output.='<b>Process id: </b> '.$proc['id'].'<br/>';
$output.='<b>Process name: </b> '.$proc['name'].'<br/>';
$output.='<b># Attempts: </b> '.$proc['attempts'].'<br/>';
$output.='<b>Timestamp: </b> '.$proc['timestamp'].'<br/>';
$output.='<b>Message: </b> '.$proc['message'].'<br/>';
}
}
}
} else
{
drupal_set_message(t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".',array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/')));
}
} catch (Exception $e)
{
drupal_set_message(t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/','%msg'=>$e->getMessage())));
}
}
}
$output .= drupal_get_form('islandora_workflow_client_search',$form,NULL, $terms, $process_id, $collection);
return $output;
}
function islandora_workflow_client_cron()
{
module_load_include('inc', 'fedora_repository', 'api/fedora_item');
module_load_include('inc', 'islandora_workflow_client', 'workflow');
ob_start();
echo 'workflow client running'."\n";
$con = new Stomp(variable_get('fedora_stomp_url', 'tcp://localhost:61613'));
$queue='/queue/fedora.apim.update';
$con->subscribe($queue);
$messagesToSend=array();
for ($i=0;$i<500;$i++) {
$msg = $con->readFrame();
if ($msg != null) {
$con->ack($msg);
$xmlobj = simplexml_load_string($msg->body);
$dsid=null; $pid=null;
$logMessage = '';
foreach ($xmlobj->category as $cat) {
switch ($cat['scheme']) {
case 'fedora-types:dsID':
$dsid=(string)$cat['term'];
break;
case 'fedora-types:pid':
$pid=(string)$cat['term'];
break;
case 'fedora-types:logMessage':
$logMessage=(string)$cat['term'];
break;
}
}
echo $pid.' '.$xmlobj->title ."\n";
if ($pid !== NULL && $xmlobj->title =='purgeObject') { // delete the object from the solr index.
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FAlSE) {
echo t('Unable to load Solr/Service from islandora_solr_search module on "%pid" for process solr_index.', array('%pid' => $pid)).'<br/>';
} else {
$host = variable_get('islandora_solr_search_block_host', 'localhost');
$port = variable_get('islandora_solr_search_block_port', '8080');
$appName = variable_get('islandora_solr_search_block_app_name', 'solr');
$solr = new Apache_Solr_Service($host, $port, '/'. $appName .'/');
try {
if ($solr->ping()) {
$solr->deleteById($pid);
$solr->commit();
$solr->optimize();
} else {
echo t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".', array('%pid' => $pid, '%solr' => $host .':'. $port . '/'. $appName .'/')).'<br/>';
}
}
catch (Exception $e) {
echo t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid' => $pid, '%solr' => $host .':'. $port .'/'. $appName .'/', '%msg' => $e->getMessage())).'<br/>';
}
}
} else if ($pid !== NULL && ($wf= Workflow::loadFromObject($pid)) !== FALSE) {
if (!$wf->validate()) {
// var_dump(Workflow::$errors);
$con->send($queue,$msg->body);
} else {
$saveFlag=FALSE;
if ($dsid !== NULL) {
$saveFlag = $wf->resetDependancies($dsid,$logMessage);
}
$procs = $wf->getReadyProcesses();
foreach ($procs as $id=>$proc) {
if (module_load_include('inc','islandora_workflow_client','plugins/'.$proc)!==FALSE && class_exists($proc))
{
$procClass = new $proc($wf,$id);
$procClass->run();
$saveFlag = true;
}
}
// saves the workflow. Note that this will also triger a JMS message so if there are still things
// to do in the workflow, then it will be put back on the queue.
if ($saveFlag)
{
// echo htmlentities($wf->dumpXml()); exit();
$wf->saveToFedora();
} else if (count($wf->getReadyProcesses()) > 0) {
// If there is still more to do and we dont need to save the workflow datastream, then
// that means there is a process that does not have a corresponding plugin (perhaps it is handled
// by another client?), so put it back on the queue.
$messagesToSend[] = $msg->body;
}
}
}
} else {
echo '.';
}
}
// Messages to send are sent out after we're done
// so that we dont try and process messages we put back on the queue immediately.
foreach ($messagesToSend as $msg) {
$con->send($queue,$msg);
}
$dump=ob_get_contents();
ob_end_clean();
if (trim($dump) !== '') {
watchdog('workflow','<pre>'.$dump.'</pre>',array(),WATCHDOG_NOTICE);
}
}