You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
345 lines
10 KiB
345 lines
10 KiB
<?php |
|
|
|
function islandora_workflow_client_menu() |
|
{ |
|
$items['admin/settings/workflow_client'] = array( |
|
'title' => 'Islandora Workflow Client', |
|
'description' => 'Manage Islandora Workflows', |
|
'page callback' => 'islandora_workflow_client_manage', |
|
'access arguments' => array('administer site configuration'), |
|
'type' => MENU_NORMAL_ITEM |
|
); |
|
|
|
return $items; |
|
} |
|
|
|
|
|
function islandora_workflow_client_search_submit($form,&$form_state) |
|
{ |
|
if (trim($form['collection_pid']['#value']) !== '') |
|
{ |
|
drupal_goto('admin/settings/workflow_client/'.$form['process_name']['#value'].'/'.$form['collection_pid']['#value']); |
|
} else |
|
{ |
|
drupal_goto('admin/settings/workflow_client/'.$form['process_name']['#value']); |
|
} |
|
} |
|
|
|
function islandora_workflow_client_search() |
|
{ |
|
module_load_include('inc', 'fedora_repository', 'api/fedora_utils'); |
|
|
|
$form = array(); |
|
$form['process_name'] = array( |
|
'#type' => 'textfield', |
|
'#title' => t('Search by Process Name'), |
|
'#description' => t('Returns a list of objects that match the process name(s) entered. Separate multiple names by spaces.'), |
|
); |
|
|
|
$form['collection_pid'] = array( |
|
'#type' => 'textfield', |
|
'#title' => t('Search by Collection PID'), |
|
'#description' => t('Returns only objects that match the also match the collection pid(s) entered. Separate multiple PIDs by spaces.'), |
|
); |
|
|
|
|
|
$form['submit'] = array('#type' => 'submit', '#value' => t('Search')); |
|
|
|
return $form; |
|
} |
|
|
|
function islandora_workflow_client_manage($terms = null, $collection = null, $queue= null, $queueProcess = null) |
|
{ |
|
if ($collection == 'none') |
|
{ |
|
$collection = null; |
|
} |
|
|
|
$output = ''; |
|
if (trim($terms) != '') |
|
{ |
|
module_load_include('inc', 'fedora_repository', 'api/fedora_item'); |
|
module_load_include('inc', 'islandora_workflow_client', 'workflow'); |
|
|
|
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FALSE) |
|
{ |
|
drupal_set_message(t('Error: Islandora_solr_search module is required to search by process.')); |
|
} else |
|
{ |
|
$host = variable_get('islandora_solr_search_block_host','localhost'); |
|
$port = variable_get('islandora_solr_search_block_port','8080'); |
|
$appName = variable_get('islandora_solr_search_block_app_name','solr'); |
|
$solr = new Apache_Solr_Service($host, $port, '/'.$appName.'/'); |
|
|
|
try |
|
{ |
|
if ($solr->ping()) |
|
{ |
|
|
|
$q = preg_split('/\s+/',$terms); |
|
foreach ($q as $key=>$bit) |
|
{ |
|
$q[$key]='workflow_process_t:'.htmlentities($bit); |
|
} |
|
$query = join(' OR ',$q); |
|
|
|
if (trim($collection) != '') |
|
{ |
|
$q= preg_split('/\s/',$collection); |
|
foreach ($q as $key=>$bit) |
|
{ |
|
$q[$key]='related_item_identifier_t:'.htmlentities(preg_replace('/\:/','/',$bit)); |
|
} |
|
$query .= ' AND ('. join(' OR ',$q).')'; |
|
} |
|
|
|
$results = $solr->search($query,0,100); |
|
|
|
$pids=array(); |
|
$processes = array(); |
|
if ($results->response->numFound ==0 ) |
|
{ |
|
drupal_set_message(t('No processes found.')); |
|
} else |
|
{ |
|
foreach($results->response->docs as $doc) |
|
{ |
|
$id = preg_replace('/\//',':',$doc->id); |
|
$collection_pid = preg_replace('/\//',':',$doc->related_item_identifier_t); |
|
|
|
$pids[]=$id; |
|
if (!is_array($doc->workflow_process_t)) |
|
{ |
|
if (!is_array($processes[$doc->workflow_process_t])) |
|
$processes[$doc->workflow_process_t]=array($id); |
|
else |
|
$processes[$doc->workflow_process_t][]=$id; |
|
} else |
|
{ |
|
foreach ($doc->workflow_process_t as $process) |
|
{ |
|
if (!is_array($processes[$process])) |
|
$processes[$process]=array($id); |
|
else |
|
$processes[$process][]=$id; |
|
} |
|
} |
|
} |
|
} |
|
|
|
$workflows=array(); |
|
foreach ($pids as $pid) |
|
{ |
|
$workflows[$pid]=Workflow::loadFromObject($pid); |
|
} |
|
|
|
if (count($processes) > 0) |
|
{ |
|
$errors=array(); |
|
$headers = array('Process Name', '# Waiting to Run', '# Completed', '# Errors', 'Action'); |
|
$rows=array(); |
|
foreach ($processes as $name=>$pids) |
|
{ |
|
$errCount = 0; |
|
$waitCount =0; |
|
$completeCount = 0; |
|
foreach ($pids as $pid) |
|
{ |
|
|
|
if ( isset($workflows[$pid]) && $workflows[$pid] !== false ) |
|
{ |
|
$procs = $workflows[$pid]->getProcesses(); |
|
$updated = FALSE; |
|
foreach ($procs as $id=>$n) |
|
{ |
|
|
|
if ($name == $n) |
|
{ |
|
$proc=$workflows[$pid]->getProcess($id); |
|
if (($queue == 'queue'|| ($queue =='errorQueue' && $proc['state'] == 'error')) && $queueProcess == $n) |
|
{ |
|
$workflows[$pid]->setState($id,'waiting'); |
|
$updated=TRUE; |
|
} |
|
|
|
|
|
switch ($proc['state']) |
|
{ |
|
case 'completed': |
|
$completeCount++; |
|
break; |
|
case 'waiting': |
|
$waitCount++; |
|
break; |
|
case 'error': |
|
$errCount++; |
|
$errors[]=$proc; |
|
break; |
|
} |
|
} |
|
} |
|
if ($updated) |
|
{ |
|
$workflows[$pid]->saveToFedora(); |
|
} |
|
} |
|
|
|
} |
|
$rows[]= array($name, $waitCount,$completeCount,$errCount,l('Add All to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($collection)==''?'none':$collection).'/queue/'.$name).'<br/>'.l('Add Errors to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($collection)==''?'none':$collection).'/errorQueue/'.$name)); |
|
} |
|
|
|
if ($queue == 'queue' || $queue == 'errorQueue') |
|
{ |
|
drupal_goto('admin/settings/workflow_client/'.$terms.(trim($collection)==''?'/'.$collection:'')); |
|
} |
|
|
|
$output.='<h3>Search for "'.$terms.'" '.(trim($collection)!=''?'in collection(s) "'.$collection.'" ':'').'returned Processes:</h3>'; |
|
$output.=theme('table',$headers,$rows); |
|
|
|
if (count ($errors) > 0) |
|
{ |
|
$output.='<h3>Found Errors</h3>'; |
|
foreach ($errors as $proc) |
|
{ |
|
$output.='<b>Process id: </b> '.$proc['id'].'<br/>'; |
|
$output.='<b>Process name: </b> '.$proc['name'].'<br/>'; |
|
$output.='<b># Attempts: </b> '.$proc['attempts'].'<br/>'; |
|
$output.='<b>Timestamp: </b> '.$proc['timestamp'].'<br/>'; |
|
$output.='<b>Message: </b> '.$proc['message'].'<br/>'; |
|
} |
|
} |
|
} |
|
|
|
|
|
} else |
|
{ |
|
drupal_set_message(t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".',array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/'))); |
|
|
|
} |
|
} catch (Exception $e) |
|
{ |
|
drupal_set_message(t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/','%msg'=>$e->getMessage()))); |
|
} |
|
} |
|
|
|
} |
|
$output .= drupal_get_form('islandora_workflow_client_search'); |
|
|
|
return $output; |
|
} |
|
|
|
|
|
function islandora_workflow_client_cron() |
|
{ |
|
module_load_include('inc', 'fedora_repository', 'api/fedora_item'); |
|
module_load_include('inc', 'islandora_workflow_client', 'workflow'); |
|
ob_start(); |
|
echo 'workflow client running'."\n"; |
|
$con = new Stomp(variable_get('fedora_stomp_url', 'tcp://localhost:61613')); |
|
$queue='/queue/fedora.apim.update'; |
|
$con->subscribe($queue); |
|
$messagesToSend=array(); |
|
for ($i=0;$i<50;$i++) { |
|
$msg = $con->readFrame(); |
|
|
|
if ($msg != null) { |
|
$con->ack($msg); |
|
|
|
$xmlobj = simplexml_load_string($msg->body); |
|
$dsid=null; $pid=null; |
|
|
|
|
|
$logMessage = ''; |
|
foreach ($xmlobj->category as $cat) { |
|
switch ($cat['scheme']) { |
|
case 'fedora-types:dsID': |
|
$dsid=(string)$cat['term']; |
|
break; |
|
case 'fedora-types:pid': |
|
$pid=(string)$cat['term']; |
|
break; |
|
case 'fedora-types:logMessage': |
|
$logMessage=(string)$cat['term']; |
|
break; |
|
} |
|
} |
|
echo $pid.' '.$xmlobj->title ."\n"; |
|
if ($pid !== NULL && $xmlobj->title =='purgeObject') { // delete the object from the solr index. |
|
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FAlSE) { |
|
echo t('Unable to load Solr/Service from islandora_solr_search module on "%pid" for process solr_index.', array('%pid' => $pid)).'<br/>'; |
|
} else { |
|
$host = variable_get('islandora_solr_search_block_host', 'localhost'); |
|
$port = variable_get('islandora_solr_search_block_port', '8080'); |
|
$appName = variable_get('islandora_solr_search_block_app_name', 'solr'); |
|
$solr = new Apache_Solr_Service($host, $port, '/'. $appName .'/'); |
|
|
|
try { |
|
if ($solr->ping()) { |
|
$solr->deleteById($pid); |
|
$solr->commit(); |
|
$solr->optimize(); |
|
} else { |
|
echo t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".', array('%pid' => $pid, '%solr' => $host .':'. $port . '/'. $appName .'/')).'<br/>'; |
|
} |
|
} |
|
catch (Exception $e) { |
|
echo t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid' => $pid, '%solr' => $host .':'. $port .'/'. $appName .'/', '%msg' => $e->getMessage())).'<br/>'; |
|
} |
|
} |
|
|
|
} else if ($pid !== NULL && ($wf= Workflow::loadFromObject($pid)) !== FALSE) { |
|
|
|
if (!$wf->validate()) { |
|
// var_dump(Workflow::$errors); |
|
$con->send($queue,$msg->body); |
|
|
|
} else { |
|
|
|
$saveFlag=FALSE; |
|
if ($dsid !== NULL) { |
|
$saveFlag = $wf->resetDependancies($dsid,$logMessage); |
|
} |
|
$procs = $wf->getReadyProcesses(); |
|
|
|
foreach ($procs as $id=>$proc) { |
|
|
|
if (module_load_include('inc','islandora_workflow_client','plugins/'.$proc)!==FALSE && class_exists($proc)) |
|
{ |
|
$procClass = new $proc($wf,$id); |
|
$procClass->run(); |
|
$saveFlag = true; |
|
} |
|
} |
|
|
|
// saves the workflow. Note that this will also triger a JMS message so if there are still things |
|
// to do in the workflow, then it will be put back on the queue. |
|
if ($saveFlag) |
|
{ |
|
// echo htmlentities($wf->dumpXml()); exit(); |
|
$wf->saveToFedora(); |
|
} else if (count($wf->getReadyProcesses()) > 0) { |
|
// If there is still more to do and we dont need to save the workflow datastream, then |
|
// that means there is a process that does not have a corresponding plugin (perhaps it is handled |
|
// by another client?), so put it back on the queue. |
|
|
|
$messagesToSend[] = $msg->body; |
|
} |
|
} |
|
} |
|
} else { |
|
echo '.'; |
|
} |
|
} |
|
|
|
// Messages to send are sent out after we're done |
|
// so that we dont try and process messages we put back on the queue immediately. |
|
foreach ($messagesToSend as $msg) { |
|
$con->send($queue,$msg); |
|
} |
|
|
|
$dump=ob_get_contents(); |
|
ob_end_clean(); |
|
if (trim($dump) !== '') { |
|
watchdog('workflow','<pre>'.$dump.'</pre>',array(),WATCHDOG_NOTICE); |
|
} |
|
}
|
|
|