You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
346 lines
10 KiB
346 lines
10 KiB
14 years ago
|
<?php
|
||
|
|
||
|
function islandora_workflow_client_menu()
|
||
|
{
|
||
|
$items['admin/settings/workflow_client'] = array(
|
||
|
'title' => 'Islandora Workflow Client',
|
||
|
'description' => 'Manage Islandora Workflows',
|
||
|
'page callback' => 'islandora_workflow_client_manage',
|
||
|
'access arguments' => array('administer site configuration'),
|
||
|
'type' => MENU_NORMAL_ITEM
|
||
|
);
|
||
|
|
||
|
return $items;
|
||
|
}
|
||
|
|
||
|
|
||
|
function islandora_workflow_client_search_submit($form,&$form_state)
|
||
|
{
|
||
|
if (trim($form['collection_pid']['#value']) !== '')
|
||
|
{
|
||
|
drupal_goto('admin/settings/workflow_client/'.$form['process_name']['#value'].'/'.$form['collection_pid']['#value']);
|
||
|
} else
|
||
|
{
|
||
|
drupal_goto('admin/settings/workflow_client/'.$form['process_name']['#value']);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function islandora_workflow_client_search()
|
||
|
{
|
||
|
module_load_include('inc', 'fedora_repository', 'api/fedora_utils');
|
||
|
|
||
|
$form = array();
|
||
|
$form['process_name'] = array(
|
||
|
'#type' => 'textfield',
|
||
|
'#title' => t('Search by Process Name'),
|
||
|
'#description' => t('Returns a list of objects that match the process name(s) entered. Separate multiple names by spaces.'),
|
||
|
);
|
||
|
|
||
|
$form['collection_pid'] = array(
|
||
|
'#type' => 'textfield',
|
||
|
'#title' => t('Search by Collection PID'),
|
||
|
'#description' => t('Returns only objects that match the also match the collection pid(s) entered. Separate multiple PIDs by spaces.'),
|
||
|
);
|
||
|
|
||
|
|
||
|
$form['submit'] = array('#type' => 'submit', '#value' => t('Search'));
|
||
|
|
||
|
return $form;
|
||
|
}
|
||
|
|
||
|
function islandora_workflow_client_manage($terms = null, $collection = null, $queue= null, $queueProcess = null)
|
||
|
{
|
||
|
if ($collection == 'none')
|
||
|
{
|
||
|
$collection = null;
|
||
|
}
|
||
|
|
||
|
$output = '';
|
||
|
if (trim($terms) != '')
|
||
|
{
|
||
|
module_load_include('inc', 'fedora_repository', 'api/fedora_item');
|
||
|
module_load_include('inc', 'islandora_workflow_client', 'workflow');
|
||
|
|
||
|
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FALSE)
|
||
|
{
|
||
|
drupal_set_message(t('Error: Islandora_solr_search module is required to search by process.'));
|
||
|
} else
|
||
|
{
|
||
|
$host = variable_get('islandora_solr_search_block_host','localhost');
|
||
|
$port = variable_get('islandora_solr_search_block_port','8080');
|
||
|
$appName = variable_get('islandora_solr_search_block_app_name','solr');
|
||
|
$solr = new Apache_Solr_Service($host, $port, '/'.$appName.'/');
|
||
|
|
||
|
try
|
||
|
{
|
||
|
if ($solr->ping())
|
||
|
{
|
||
|
|
||
|
$q = preg_split('/\s+/',$terms);
|
||
|
foreach ($q as $key=>$bit)
|
||
|
{
|
||
|
$q[$key]='workflow_process_t:'.htmlentities($bit);
|
||
|
}
|
||
|
$query = join(' OR ',$q);
|
||
|
|
||
|
if (trim($collection) != '')
|
||
|
{
|
||
|
$q= preg_split('/\s/',$collection);
|
||
|
foreach ($q as $key=>$bit)
|
||
|
{
|
||
|
$q[$key]='related_item_identifier_t:'.htmlentities(preg_replace('/\:/','/',$bit));
|
||
|
}
|
||
|
$query .= ' AND ('. join(' OR ',$q).')';
|
||
|
}
|
||
|
|
||
|
$results = $solr->search($query,0,100);
|
||
|
|
||
|
$pids=array();
|
||
|
$processes = array();
|
||
|
if ($results->response->numFound ==0 )
|
||
|
{
|
||
|
drupal_set_message(t('No processes found.'));
|
||
|
} else
|
||
|
{
|
||
|
foreach($results->response->docs as $doc)
|
||
|
{
|
||
|
$id = preg_replace('/\//',':',$doc->id);
|
||
|
$collection_pid = preg_replace('/\//',':',$doc->related_item_identifier_t);
|
||
|
|
||
|
$pids[]=$id;
|
||
|
if (!is_array($doc->workflow_process_t))
|
||
|
{
|
||
|
if (!is_array($processes[$doc->workflow_process_t]))
|
||
|
$processes[$doc->workflow_process_t]=array($id);
|
||
|
else
|
||
|
$processes[$doc->workflow_process_t][]=$id;
|
||
|
} else
|
||
|
{
|
||
|
foreach ($doc->workflow_process_t as $process)
|
||
|
{
|
||
|
if (!is_array($processes[$process]))
|
||
|
$processes[$process]=array($id);
|
||
|
else
|
||
|
$processes[$process][]=$id;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
$workflows=array();
|
||
|
foreach ($pids as $pid)
|
||
|
{
|
||
|
$workflows[$pid]=Workflow::loadFromObject($pid);
|
||
|
}
|
||
|
|
||
|
if (count($processes) > 0)
|
||
|
{
|
||
|
$errors=array();
|
||
|
$headers = array('Process Name', '# Waiting to Run', '# Completed', '# Errors', 'Action');
|
||
|
$rows=array();
|
||
|
foreach ($processes as $name=>$pids)
|
||
|
{
|
||
|
$errCount = 0;
|
||
|
$waitCount =0;
|
||
|
$completeCount = 0;
|
||
|
foreach ($pids as $pid)
|
||
|
{
|
||
|
|
||
|
if ( isset($workflows[$pid]) && $workflows[$pid] !== false )
|
||
|
{
|
||
|
$procs = $workflows[$pid]->getProcesses();
|
||
|
$updated = FALSE;
|
||
|
foreach ($procs as $id=>$n)
|
||
|
{
|
||
|
|
||
|
if ($name == $n)
|
||
|
{
|
||
|
$proc=$workflows[$pid]->getProcess($id);
|
||
|
if (($queue == 'queue'|| ($queue =='errorQueue' && $proc['state'] == 'error')) && $queueProcess == $n)
|
||
|
{
|
||
|
$workflows[$pid]->setState($id,'waiting');
|
||
|
$updated=TRUE;
|
||
|
}
|
||
|
|
||
|
|
||
|
switch ($proc['state'])
|
||
|
{
|
||
|
case 'completed':
|
||
|
$completeCount++;
|
||
|
break;
|
||
|
case 'waiting':
|
||
|
$waitCount++;
|
||
|
break;
|
||
|
case 'error':
|
||
|
$errCount++;
|
||
|
$errors[]=$proc;
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if ($updated)
|
||
|
{
|
||
|
$workflows[$pid]->saveToFedora();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
$rows[]= array($name, $waitCount,$completeCount,$errCount,l('Add All to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($collection)==''?'none':$collection).'/queue/'.$name).'<br/>'.l('Add Errors to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($collection)==''?'none':$collection).'/errorQueue/'.$name));
|
||
|
}
|
||
|
|
||
|
if ($queue == 'queue' || $queue == 'errorQueue')
|
||
|
{
|
||
|
drupal_goto('admin/settings/workflow_client/'.$terms.(trim($collection)==''?'/'.$collection:''));
|
||
|
}
|
||
|
|
||
|
$output.='<h3>Search for "'.$terms.'" '.(trim($collection)!=''?'in collection(s) "'.$collection.'" ':'').'returned Processes:</h3>';
|
||
|
$output.=theme('table',$headers,$rows);
|
||
|
|
||
|
if (count ($errors) > 0)
|
||
|
{
|
||
|
$output.='<h3>Found Errors</h3>';
|
||
|
foreach ($errors as $proc)
|
||
|
{
|
||
|
$output.='<b>Process id: </b> '.$proc['id'].'<br/>';
|
||
|
$output.='<b>Process name: </b> '.$proc['name'].'<br/>';
|
||
|
$output.='<b># Attempts: </b> '.$proc['attempts'].'<br/>';
|
||
|
$output.='<b>Timestamp: </b> '.$proc['timestamp'].'<br/>';
|
||
|
$output.='<b>Message: </b> '.$proc['message'].'<br/>';
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
} else
|
||
|
{
|
||
|
drupal_set_message(t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".',array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/')));
|
||
|
|
||
|
}
|
||
|
} catch (Exception $e)
|
||
|
{
|
||
|
drupal_set_message(t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/','%msg'=>$e->getMessage())));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
$output .= drupal_get_form('islandora_workflow_client_search');
|
||
|
|
||
|
return $output;
|
||
|
}
|
||
|
|
||
|
|
||
|
function islandora_workflow_client_cron()
|
||
|
{
|
||
|
module_load_include('inc', 'fedora_repository', 'api/fedora_item');
|
||
|
module_load_include('inc', 'islandora_workflow_client', 'workflow');
|
||
|
ob_start();
|
||
|
echo 'workflow client running'."\n";
|
||
|
$con = new Stomp(variable_get('fedora_stomp_url', 'tcp://localhost:61613'));
|
||
|
$queue='/queue/fedora.apim.update';
|
||
|
$con->subscribe($queue);
|
||
|
$messagesToSend=array();
|
||
|
for ($i=0;$i<50;$i++) {
|
||
|
$msg = $con->readFrame();
|
||
|
|
||
|
if ($msg != null) {
|
||
|
$con->ack($msg);
|
||
|
|
||
|
$xmlobj = simplexml_load_string($msg->body);
|
||
|
$dsid=null; $pid=null;
|
||
|
|
||
|
|
||
|
$logMessage = '';
|
||
|
foreach ($xmlobj->category as $cat) {
|
||
|
switch ($cat['scheme']) {
|
||
|
case 'fedora-types:dsID':
|
||
|
$dsid=(string)$cat['term'];
|
||
|
break;
|
||
|
case 'fedora-types:pid':
|
||
|
$pid=(string)$cat['term'];
|
||
|
break;
|
||
|
case 'fedora-types:logMessage':
|
||
|
$logMessage=(string)$cat['term'];
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
echo $pid.' '.$xmlobj->title ."\n";
|
||
|
if ($pid !== NULL && $xmlobj->title =='purgeObject') { // delete the object from the solr index.
|
||
|
if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FAlSE) {
|
||
|
echo t('Unable to load Solr/Service from islandora_solr_search module on "%pid" for process solr_index.', array('%pid' => $pid)).'<br/>';
|
||
|
} else {
|
||
|
$host = variable_get('islandora_solr_search_block_host', 'localhost');
|
||
|
$port = variable_get('islandora_solr_search_block_port', '8080');
|
||
|
$appName = variable_get('islandora_solr_search_block_app_name', 'solr');
|
||
|
$solr = new Apache_Solr_Service($host, $port, '/'. $appName .'/');
|
||
|
|
||
|
try {
|
||
|
if ($solr->ping()) {
|
||
|
$solr->deleteById($pid);
|
||
|
$solr->commit();
|
||
|
$solr->optimize();
|
||
|
} else {
|
||
|
echo t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".', array('%pid' => $pid, '%solr' => $host .':'. $port . '/'. $appName .'/')).'<br/>';
|
||
|
}
|
||
|
}
|
||
|
catch (Exception $e) {
|
||
|
echo t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid' => $pid, '%solr' => $host .':'. $port .'/'. $appName .'/', '%msg' => $e->getMessage())).'<br/>';
|
||
|
}
|
||
|
}
|
||
|
|
||
|
} else if ($pid !== NULL && ($wf= Workflow::loadFromObject($pid)) !== FALSE) {
|
||
|
|
||
|
if (!$wf->validate()) {
|
||
|
// var_dump(Workflow::$errors);
|
||
|
$con->send($queue,$msg->body);
|
||
|
|
||
|
} else {
|
||
|
|
||
|
$saveFlag=FALSE;
|
||
|
if ($dsid !== NULL) {
|
||
|
$saveFlag = $wf->resetDependancies($dsid,$logMessage);
|
||
|
}
|
||
|
$procs = $wf->getReadyProcesses();
|
||
|
|
||
|
foreach ($procs as $id=>$proc) {
|
||
|
|
||
|
if (module_load_include('inc','islandora_workflow_client','plugins/'.$proc)!==FALSE && class_exists($proc))
|
||
|
{
|
||
|
$procClass = new $proc($wf,$id);
|
||
|
$procClass->run();
|
||
|
$saveFlag = true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// saves the workflow. Note that this will also triger a JMS message so if there are still things
|
||
|
// to do in the workflow, then it will be put back on the queue.
|
||
|
if ($saveFlag)
|
||
|
{
|
||
|
// echo htmlentities($wf->dumpXml()); exit();
|
||
|
$wf->saveToFedora();
|
||
|
} else if (count($wf->getReadyProcesses()) > 0) {
|
||
|
// If there is still more to do and we dont need to save the workflow datastream, then
|
||
|
// that means there is a process that does not have a corresponding plugin (perhaps it is handled
|
||
|
// by another client?), so put it back on the queue.
|
||
|
|
||
|
$messagesToSend[] = $msg->body;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
echo '.';
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Messages to send are sent out after we're done
|
||
|
// so that we dont try and process messages we put back on the queue immediately.
|
||
|
foreach ($messagesToSend as $msg) {
|
||
|
$con->send($queue,$msg);
|
||
|
}
|
||
|
|
||
|
$dump=ob_get_contents();
|
||
|
ob_end_clean();
|
||
|
if (trim($dump) !== '') {
|
||
|
watchdog('workflow','<pre>'.$dump.'</pre>',array(),WATCHDOG_NOTICE);
|
||
|
}
|
||
|
}
|