'Islandora Workflow Client', 'description' => 'Manage Islandora Workflows', 'page callback' => 'islandora_workflow_client_manage', 'access arguments' => array('administer site configuration'), 'type' => MENU_NORMAL_ITEM ); return $items; } function islandora_workflow_client_search_submit($form,&$form_state) { $url ='admin/settings/workflow_client/'.$form['process_name']['#value']; if (trim($form['process_id']['#value']) !== '') { $url.='/'.$form['process_id']['#value']; } else { $url .= '/-/'; } if (trim($form['collection_pid']['#value']) !== '') { $url.='/'.$form['collection_pid']['#value']; } else { $url .= '/-/'; } drupal_goto($url); } function islandora_workflow_client_search(&$form_state,$terms=null,$process_id=null,$collection=null) { module_load_include('inc', 'fedora_repository', 'api/fedora_utils'); $form = array(); $form['process_name'] = array( '#type' => 'textfield', '#required'=> TRUE, '#title' => t('Search by Process Name'), '#default_value' => ($terms!=null?$terms:''), '#description' => t('Returns a list of objects that match the process name(s) entered. Separate multiple names by spaces.'), ); $form['process_id'] = array( '#type' => 'textfield', '#title' => t('Search by Process ID'), '#default_value' => ($process_id!=null?$process_id:''), '#description' => t('Returns only objects that match the also match the process id entered. '), ); $form['collection_pid'] = array( '#type' => 'textfield', '#title' => t('Search by Collection PID'), '#default_value' => ($collection!=null?$collection:''), '#description' => t('Returns only objects that match the also match the collection pid(s) entered. Separate multiple PIDs by spaces.'), ); $form['submit'] = array('#type' => 'submit', '#value' => t('Search')); return $form; } function islandora_workflow_client_manage($terms = null, $process_id= null, $collection = null, $queue= null, $queueProcess = null) { if ($collection == 'none' || $collection == '-') { $collection = null; } if ($process_id == 'none' || $process_id == '-') { $process_id = null; } $output = ''; if (trim($terms) != '') { module_load_include('inc', 'fedora_repository', 'api/fedora_item'); module_load_include('inc', 'islandora_workflow_client', 'workflow'); if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FALSE) { drupal_set_message(t('Error: Islandora_solr_search module is required to search by process.')); } else { $host = variable_get('islandora_solr_search_block_host','localhost'); $port = variable_get('islandora_solr_search_block_port','8080'); $appName = variable_get('islandora_solr_search_block_app_name','solr'); $solr = new Apache_Solr_Service($host, $port, '/'.$appName.'/'); try { if ($solr->ping()) { $q = preg_split('/\s+/',$terms); foreach ($q as $key=>$bit) { $q[$key]='workflow_process_t:'.htmlentities($bit); } $query = join(' OR ',$q); if (trim($collection) != '') { $q= preg_split('/\s/',$collection); foreach ($q as $key=>$bit) { $q[$key]='related_item_identifier_t:'.htmlentities(preg_replace('/\:/','/',$bit)); } $query .= ' AND ('. join(' OR ',$q).')'; } $results = $solr->search($query,0,100); $pids=array(); $processes = array(); if ($results->response->numFound ==0 ) { drupal_set_message(t('No processes found.')); } else { foreach($results->response->docs as $doc) { $id = preg_replace('/\//',':',$doc->id); $collection_pid = preg_replace('/\//',':',$doc->related_item_identifier_t); $pids[]=$id; if (!is_array($doc->workflow_process_t)) { if (!is_array($processes[$doc->workflow_process_t])) $processes[$doc->workflow_process_t]=array($id); else $processes[$doc->workflow_process_t][]=$id; } else { foreach ($doc->workflow_process_t as $process) { if (!is_array($processes[$process])) $processes[$process]=array($id); else $processes[$process][]=$id; } } } } $workflows=array(); foreach ($pids as $pid) { $workflows[$pid]=Workflow::loadFromObject($pid); } if (count($processes) > 0) { $errors=array(); $headers = array('Process Name', '# Waiting to Run', '# Completed', '# Errors', 'Action'); $rows=array(); foreach ($processes as $name=>$pids) { $errCount = 0; $waitCount =0; $completeCount = 0; $display = false; foreach ($pids as $pid) { if ( isset($workflows[$pid]) && $workflows[$pid] !== false ) { $display = true; $procs = $workflows[$pid]->getProcesses(); $updated = FALSE; foreach ($procs as $id=>$n) { if ($process_id == null || $id == $process_id) { if ($name == $n) { $proc=$workflows[$pid]->getProcess($id); if (($queue == 'queue'|| ($queue =='errorQueue' && $proc['state'] == 'error')) && $queueProcess == $n) { $workflows[$pid]->setState($id,'waiting'); $updated=TRUE; } switch ($proc['state']) { case 'completed': $completeCount++; break; case 'waiting': $waitCount++; break; case 'error': $errCount++; $errors[]=$proc; break; } } } if ($updated) { $workflows[$pid]->saveToFedora(); } } } } if ($display) { $rows[]= array($name, $waitCount,$completeCount,$errCount, l('Add All to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($process_id)==''?'none':$process_id).'/'.(trim($collection)==''?'none':$collection).'/queue/'.$name).'
'. l('Add Errors to Queue','admin/settings/workflow_client/'.$terms.'/'.(trim($process_id)==''?'none':$process_id).'/'.(trim($collection)==''?'none':$collection).'/errorQueue/'.$name)); } } if ($queue == 'queue' || $queue == 'errorQueue') { drupal_goto('admin/settings/workflow_client/'.$terms.(trim($collection)==''?'/'.$collection:'')); } $output.='

Search for '.($process_id!=null?'process id '.$process_id.' with terms ':'').' "'.$terms.'" '.(trim($collection)!=''?'in collection(s) "'.$collection.'" ':'').'returned Processes:

'; $output.=theme('table',$headers,$rows); if (count ($errors) > 0) { $output.='

Found Errors

'; foreach ($errors as $proc) { $output.='Process id: '.$proc['id'].'
'; $output.='Process name: '.$proc['name'].'
'; $output.='# Attempts: '.$proc['attempts'].'
'; $output.='Timestamp: '.$proc['timestamp'].'
'; $output.='Message: '.$proc['message'].'
'; } } } } else { drupal_set_message(t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".',array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/'))); } } catch (Exception $e) { drupal_set_message(t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid'=>$pid,'%solr'=>$host.':'.$port.'/'.$appName.'/','%msg'=>$e->getMessage()))); } } } $output .= drupal_get_form('islandora_workflow_client_search',$form,NULL, $terms, $process_id, $collection); return $output; } function islandora_workflow_client_cron() { module_load_include('inc', 'fedora_repository', 'api/fedora_item'); module_load_include('inc', 'islandora_workflow_client', 'workflow'); ob_start(); echo 'workflow client running'."\n"; $con = new Stomp(variable_get('fedora_stomp_url', 'tcp://localhost:61613')); $queue='/queue/fedora.apim.update'; $con->subscribe($queue); $messagesToSend=array(); for ($i=0;$i<100;$i++) { $msg = $con->readFrame(); if ($msg != null) { $con->ack($msg); $xmlobj = simplexml_load_string($msg->body); $dsid=null; $pid=null; $logMessage = ''; foreach ($xmlobj->category as $cat) { switch ($cat['scheme']) { case 'fedora-types:dsID': $dsid=(string)$cat['term']; break; case 'fedora-types:pid': $pid=(string)$cat['term']; break; case 'fedora-types:logMessage': $logMessage=(string)$cat['term']; break; } } echo $pid.' '.$xmlobj->title ."\n"; if ($pid !== NULL && $xmlobj->title =='purgeObject') { // delete the object from the solr index. if (module_load_include('php', 'islandora_solr_search', 'Solr/Service') === FAlSE) { echo t('Unable to load Solr/Service from islandora_solr_search module on "%pid" for process solr_index.', array('%pid' => $pid)).'
'; } else { $host = variable_get('islandora_solr_search_block_host', 'localhost'); $port = variable_get('islandora_solr_search_block_port', '8080'); $appName = variable_get('islandora_solr_search_block_app_name', 'solr'); $solr = new Apache_Solr_Service($host, $port, '/'. $appName .'/'); try { if ($solr->ping()) { $solr->deleteById($pid); $solr->commit(); $solr->optimize(); } else { echo t('Unable to connect to Solr at "%solr" for solr_index process on "%pid".', array('%pid' => $pid, '%solr' => $host .':'. $port . '/'. $appName .'/')).'
'; } } catch (Exception $e) { echo t('Caught exception from Solr at %solr for solr_index process on "%pid": %msg', array('%pid' => $pid, '%solr' => $host .':'. $port .'/'. $appName .'/', '%msg' => $e->getMessage())).'
'; } } } else if ($pid !== NULL && ($wf= Workflow::loadFromObject($pid)) !== FALSE) { if (!$wf->validate()) { // var_dump(Workflow::$errors); $con->send($queue,$msg->body); } else { $saveFlag=FALSE; if ($dsid !== NULL) { $saveFlag = $wf->resetDependancies($dsid,$logMessage); } $procs = $wf->getReadyProcesses(); foreach ($procs as $id=>$proc) { if (module_load_include('inc','islandora_workflow_client','plugins/'.$proc)!==FALSE && class_exists($proc)) { $procClass = new $proc($wf,$id); $procClass->run(); $saveFlag = true; } } // saves the workflow. Note that this will also triger a JMS message so if there are still things // to do in the workflow, then it will be put back on the queue. if ($saveFlag) { // echo htmlentities($wf->dumpXml()); exit(); $wf->saveToFedora(); } else if (count($wf->getReadyProcesses()) > 0) { // If there is still more to do and we dont need to save the workflow datastream, then // that means there is a process that does not have a corresponding plugin (perhaps it is handled // by another client?), so put it back on the queue. $messagesToSend[] = $msg->body; } } } } else { echo '.'; } } // Messages to send are sent out after we're done // so that we dont try and process messages we put back on the queue immediately. foreach ($messagesToSend as $msg) { $con->send($queue,$msg); } $dump=ob_get_contents(); ob_end_clean(); if (trim($dump) !== '') { watchdog('workflow','
'.$dump.'
',array(),WATCHDOG_NOTICE); } }