Browse Source
* Broadcaster rules action. * Re-exporting rules * Tests * Updating travis environment for testing with activemqpull/756/head
dannylamb
8 years ago
committed by
Jared Whiklo
27 changed files with 475 additions and 77 deletions
@ -1,25 +1,31 @@ |
|||||||
|
uuid: 29ea11d1-4267-4bc1-bdc0-550e58833c0b |
||||||
langcode: en |
langcode: en |
||||||
status: true |
status: true |
||||||
dependencies: |
dependencies: |
||||||
|
enforced: |
||||||
|
module: |
||||||
|
- islandora |
||||||
config: |
config: |
||||||
- field.storage.fedora_resource.rdf_source.field_ldp_contains |
- field.storage.fedora_resource.field_ldp_contains |
||||||
- islandora.fedora_resource_type.non_rdf_source |
|
||||||
- islandora.fedora_resource_type.rdf_source |
- islandora.fedora_resource_type.rdf_source |
||||||
|
_core: |
||||||
|
default_config_hash: y-E4uXZywn0AAVWULpj46kKRevLvEFQup5pLJ2J9jsk |
||||||
id: fedora_resource.rdf_source.field_ldp_contains |
id: fedora_resource.rdf_source.field_ldp_contains |
||||||
field_name: field_ldp_contains |
field_name: field_ldp_contains |
||||||
entity_type: fedora_resource |
entity_type: fedora_resource |
||||||
bundle: rdf_source |
bundle: rdf_source |
||||||
label: ldp:contains |
label: 'ldp:contains' |
||||||
description: 'Contains Fedora Resource.' |
description: 'Contains Fedora Resource.' |
||||||
required: false |
required: false |
||||||
translatable: false |
translatable: false |
||||||
default_value: { } |
default_value: { } |
||||||
default_value_callback: '' |
default_value_callback: '' |
||||||
settings: |
settings: |
||||||
handler: 'default:fedora_resource' |
handler: views |
||||||
handler_settings: |
handler_settings: |
||||||
view: |
view: |
||||||
view_name: bundles_of_fedora |
view_name: fedora_entities_reference |
||||||
display_name: entity_reference_1 |
display_name: entity_reference_1 |
||||||
arguments: { } |
arguments: { } |
||||||
field_type: entity_reference |
field_type: entity_reference |
||||||
|
|
||||||
|
@ -1,8 +1,12 @@ |
|||||||
|
uuid: d40c0b76-1a6d-4dd2-8f0b-2945e30d2832 |
||||||
langcode: en |
langcode: en |
||||||
status: true |
status: true |
||||||
dependencies: |
dependencies: |
||||||
|
enforced: |
||||||
module: |
module: |
||||||
- islandora |
- islandora |
||||||
|
_core: |
||||||
|
default_config_hash: jkwDP8hekY3eWzIrEN4yinGftfXZlArUaz7PsiU6BcM |
||||||
id: fedora_resource.field_ldp_contains |
id: fedora_resource.field_ldp_contains |
||||||
field_name: field_ldp_contains |
field_name: field_ldp_contains |
||||||
entity_type: fedora_resource |
entity_type: fedora_resource |
@ -1,3 +1,11 @@ |
|||||||
|
uuid: 68d5a1cb-e493-4e9d-adad-21229a41ac68 |
||||||
langcode: en |
langcode: en |
||||||
|
status: true |
||||||
|
dependencies: |
||||||
|
enforced: |
||||||
|
module: |
||||||
|
- islandora |
||||||
|
_core: |
||||||
|
default_config_hash: KgkkNCMXV1urHhxyo7wuwYPWaEAnyLAXvwrRnZWDmtw |
||||||
id: non_rdf_source |
id: non_rdf_source |
||||||
label: 'Non RDF Source' |
label: 'Non RDF Source' |
@ -1,3 +1,11 @@ |
|||||||
|
uuid: af1cb3d8-2c1f-4ca1-a1c2-b1173704807f |
||||||
langcode: en |
langcode: en |
||||||
|
status: true |
||||||
|
dependencies: |
||||||
|
enforced: |
||||||
|
module: |
||||||
|
- islandora |
||||||
|
_core: |
||||||
|
default_config_hash: 61Pa7qgwWd8HBTCgoxF3RxTsvCBcmHNhfg5k8FPqEpQ |
||||||
id: rdf_source |
id: rdf_source |
||||||
label: 'RDF Source' |
label: 'RDF Source' |
||||||
|
@ -1,4 +1,6 @@ |
|||||||
broker_url: 'http://localhost:61613' |
broker_url: 'tcp://localhost:61613' |
||||||
triplestore_index_queue: '/islandora/triplestore/index' |
|
||||||
fedora_rest_endpoint: 'http://localhost:8080/fcrepo/rest' |
fedora_rest_endpoint: 'http://localhost:8080/fcrepo/rest' |
||||||
fedora_indexing_queue: 'islandora/indexing/fedora' |
_core: |
||||||
|
default_config_hash: nDZDR2rrpXXQ-D_7BYrdDFAXYOsB5hgH6vCAMV5I3w8 |
||||||
|
broadcast_queue: islandora-connector-broadcast |
||||||
|
|
||||||
|
@ -0,0 +1,14 @@ |
|||||||
|
islandora.settings: |
||||||
|
type: config_object |
||||||
|
label: 'Islandora Core Settings' |
||||||
|
mapping: |
||||||
|
broker_url: |
||||||
|
type: string |
||||||
|
label: 'Url to connect to message broker' |
||||||
|
fedora_rest_endpoint: |
||||||
|
type: string |
||||||
|
label: 'Url to Fedora instance' |
||||||
|
broadcast_queue: |
||||||
|
type: string |
||||||
|
label: 'Queue that handles distributing messages amongst multiple recipients' |
||||||
|
|
@ -0,0 +1,125 @@ |
|||||||
|
<?php |
||||||
|
|
||||||
|
namespace Drupal\islandora\Plugin\RulesAction; |
||||||
|
|
||||||
|
use Drupal\Core\Plugin\ContainerFactoryPluginInterface; |
||||||
|
use Drupal\islandora\Form\IslandoraSettingsForm; |
||||||
|
use Drupal\rules\Core\RulesActionBase; |
||||||
|
use Stomp\Exception\StompException; |
||||||
|
use Stomp\StatefulStomp; |
||||||
|
use Stomp\Transport\Message; |
||||||
|
use Symfony\Component\DependencyInjection\ContainerInterface; |
||||||
|
|
||||||
|
/** |
||||||
|
* Provides an action to broadcast an event to multiple queues/topics. |
||||||
|
* |
||||||
|
* @RulesAction( |
||||||
|
* id = "islandora_broadcast", |
||||||
|
* label = @Translation("Broadcast Message"), |
||||||
|
* category = @Translation("Islandora"), |
||||||
|
* context = { |
||||||
|
* "message" = @ContextDefinition("string", |
||||||
|
* label = @Translation("Message") |
||||||
|
* ), |
||||||
|
* "recipients" = @ContextDefinition("string", |
||||||
|
* label = @Translation("Recipients"), |
||||||
|
* description = @Translation("Queues/Topics to receive the message"), |
||||||
|
* multiple = TRUE |
||||||
|
* ), |
||||||
|
* } |
||||||
|
* ) |
||||||
|
*/ |
||||||
|
class Broadcaster extends RulesActionBase implements ContainerFactoryPluginInterface { |
||||||
|
|
||||||
|
/** |
||||||
|
* Stomp client. |
||||||
|
* |
||||||
|
* @var \Stomp\StatefulStomp |
||||||
|
*/ |
||||||
|
protected $stomp; |
||||||
|
|
||||||
|
/** |
||||||
|
* Name of broadcast queue. |
||||||
|
* |
||||||
|
* @var string |
||||||
|
*/ |
||||||
|
protected $broadcastQueue; |
||||||
|
|
||||||
|
/** |
||||||
|
* Constructs a BroadcastAction. |
||||||
|
* |
||||||
|
* @param array $configuration |
||||||
|
* A configuration array containing information about the plugin instance. |
||||||
|
* @param string $plugin_id |
||||||
|
* The plugin ID for the plugin instance. |
||||||
|
* @param mixed $plugin_definition |
||||||
|
* The plugin implementation definition. |
||||||
|
* @param string $broadcast_queue |
||||||
|
* Name of queue that will handle distributing the broadcast. |
||||||
|
* @param \Stomp\StatefulStomp $stomp |
||||||
|
* Stomp client. |
||||||
|
*/ |
||||||
|
public function __construct(array $configuration, $plugin_id, $plugin_definition, $broadcast_queue, StatefulStomp $stomp) { |
||||||
|
parent::__construct($configuration, $plugin_id, $plugin_definition); |
||||||
|
|
||||||
|
$this->broadcastQueue = $broadcast_queue; |
||||||
|
$this->stomp = $stomp; |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* {@inheritdoc} |
||||||
|
*/ |
||||||
|
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) { |
||||||
|
$config = $container->get('config.factory'); |
||||||
|
$settings = $config->get(IslandoraSettingsForm::CONFIG_NAME); |
||||||
|
$broadcastQueue = $settings->get(IslandoraSettingsForm::BROADCAST_QUEUE); |
||||||
|
|
||||||
|
return new static( |
||||||
|
$configuration, |
||||||
|
$plugin_id, |
||||||
|
$plugin_definition, |
||||||
|
$broadcastQueue, |
||||||
|
$container->get('islandora.stomp') |
||||||
|
); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Sends a message to a broadcaster to get distributed. |
||||||
|
* |
||||||
|
* @param string $message |
||||||
|
* Message body to send. |
||||||
|
* @param array $recipients |
||||||
|
* List of queues/topics to broadcast message to. |
||||||
|
*/ |
||||||
|
protected function doExecute($message, array $recipients) { |
||||||
|
// Transform recipients array into comma searated list. |
||||||
|
$recipients = array_map('trim', $recipients); |
||||||
|
$recipients = implode(',', $recipients); |
||||||
|
|
||||||
|
// Transform message from string into a proper message object. |
||||||
|
$message = new Message($message, ['IslandoraBroadcastRecipients' => $recipients]); |
||||||
|
|
||||||
|
// Send the message. |
||||||
|
try { |
||||||
|
$this->stomp->begin(); |
||||||
|
$this->stomp->send($this->broadcastQueue, $message); |
||||||
|
$this->stomp->commit(); |
||||||
|
} |
||||||
|
catch (StompException $e) { |
||||||
|
// Log it. |
||||||
|
\Drupal::logger('islandora')->error( |
||||||
|
'Error publishing message: @msg', |
||||||
|
['@msg' => $e->getMessage()] |
||||||
|
); |
||||||
|
|
||||||
|
// Notify user. |
||||||
|
drupal_set_message( |
||||||
|
t('Error publishing message: @msg', |
||||||
|
['@msg' => $e->getMessage()] |
||||||
|
), |
||||||
|
'error' |
||||||
|
); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,33 @@ |
|||||||
|
<?php |
||||||
|
|
||||||
|
namespace Drupal\islandora; |
||||||
|
|
||||||
|
use Drupal\Core\Config\ConfigFactoryInterface; |
||||||
|
use Drupal\islandora\Form\IslandoraSettingsForm; |
||||||
|
use Stomp\Client; |
||||||
|
use Stomp\StatefulStomp; |
||||||
|
|
||||||
|
/** |
||||||
|
* StatefulStomp static factory. |
||||||
|
*/ |
||||||
|
class StompFactory { |
||||||
|
|
||||||
|
/** |
||||||
|
* Factory function. |
||||||
|
* |
||||||
|
* @param \Drupal\Core\Config\ConfigFactoryInterface $config |
||||||
|
* Config. |
||||||
|
* |
||||||
|
* @return \Stomp\StatefulStomp |
||||||
|
* Stomp client. |
||||||
|
*/ |
||||||
|
static public function create(ConfigFactoryInterface $config) { |
||||||
|
// Get broker url from config. |
||||||
|
$settings = $config->get(IslandoraSettingsForm::CONFIG_NAME); |
||||||
|
$brokerUrl = $settings->get(IslandoraSettingsForm::BROKER_URL); |
||||||
|
|
||||||
|
$client = new Client($brokerUrl); |
||||||
|
return new StatefulStomp($client); |
||||||
|
} |
||||||
|
|
||||||
|
} |
@ -0,0 +1,127 @@ |
|||||||
|
<?php |
||||||
|
|
||||||
|
namespace Drupal\Tests\islandora\Kernel; |
||||||
|
|
||||||
|
use Stomp\Exception\StompException; |
||||||
|
use Stomp\StatefulStomp; |
||||||
|
use Drupal\islandora\Plugin\RulesAction\Broadcaster; |
||||||
|
|
||||||
|
/** |
||||||
|
* Broadcaster tests. |
||||||
|
* |
||||||
|
* @group islandora |
||||||
|
* @coversDefaultClass \Drupal\islandora\Plugin\RulesAction\Broadcaster |
||||||
|
*/ |
||||||
|
class BroadcasterTest extends IslandoraKernelTestBase { |
||||||
|
|
||||||
|
protected $testQueue = 'islandora-broadcaster-test-queue'; |
||||||
|
|
||||||
|
/** |
||||||
|
* Tests that the action does not WSOD Drupal when there's a StompException. |
||||||
|
* |
||||||
|
* @covers \Drupal\islandora\Plugin\RulesAction\Broadcaster::execute |
||||||
|
*/ |
||||||
|
public function testExecuteSquashesStompExceptions() { |
||||||
|
// Set up a fake Stomp client to throw a StompException when used. |
||||||
|
$prophecy = $this->prophesize(StatefulStomp::CLASS); |
||||||
|
$prophecy->begin()->willThrow(new StompException('STOMP EXCEPTION')); |
||||||
|
$stomp = $prophecy->reveal(); |
||||||
|
|
||||||
|
$action = $this->createBroadcaster($stomp); |
||||||
|
|
||||||
|
try { |
||||||
|
// Execute the action. |
||||||
|
$action->execute(); |
||||||
|
$this->assertTrue(TRUE, "The execute() method must squash StompExceptions."); |
||||||
|
} |
||||||
|
catch (\Exception $e) { |
||||||
|
$this->assertTrue(FALSE, "The execute() method must squash StompExceptions."); |
||||||
|
} |
||||||
|
|
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Tests that the action DOES NOT squash any other Exception. |
||||||
|
* |
||||||
|
* @covers \Drupal\islandora\Plugin\RulesAction\Broadcaster::execute |
||||||
|
* @expectedException \Exception |
||||||
|
*/ |
||||||
|
public function testExecuteDoesNotSquashOtherExceptions() { |
||||||
|
// Set up a fake Stomp client to throw a non-StompException when used. |
||||||
|
$prophecy = $this->prophesize(StatefulStomp::CLASS); |
||||||
|
$prophecy->begin()->willThrow(new \Exception('NOT A STOMP EXCEPTION')); |
||||||
|
$stomp = $prophecy->reveal(); |
||||||
|
|
||||||
|
$action = $this->createBroadcaster($stomp); |
||||||
|
|
||||||
|
// This should throw an exception. |
||||||
|
$action->execute(); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Tests that the action publishes the message to be broadcast to a broker. |
||||||
|
* |
||||||
|
* @covers \Drupal\islandora\Plugin\RulesAction\Broadcaster::execute |
||||||
|
*/ |
||||||
|
public function testBrokerIntegration() { |
||||||
|
// Grab a legit stomp client, using values from config. |
||||||
|
$this->installConfig('islandora'); |
||||||
|
$stomp = $this->container->get('islandora.stomp'); |
||||||
|
|
||||||
|
// Create and execute the action. |
||||||
|
$action = $this->createBroadcaster($stomp); |
||||||
|
$action->execute(); |
||||||
|
|
||||||
|
// Verify the message actually got sent. |
||||||
|
try { |
||||||
|
$stomp->subscribe($this->testQueue); |
||||||
|
$msg = $stomp->read(); |
||||||
|
|
||||||
|
$this->assertTrue( |
||||||
|
strcmp($msg->getBody(), 'test') == 0, |
||||||
|
"Message body is not 'test'" |
||||||
|
); |
||||||
|
|
||||||
|
$headers = $msg->getHeaders(); |
||||||
|
$this->assertTrue( |
||||||
|
strcmp($headers['IslandoraBroadcastRecipients'], 'activemq:queue:foo,activemq:queue:bar') == 0, |
||||||
|
"IslandoraBroadcastRecipients header must be a comma separated list of endpoints" |
||||||
|
); |
||||||
|
$stomp->unsubscribe(); |
||||||
|
} |
||||||
|
catch (StompException $e) { |
||||||
|
$this->assertTrue(FALSE, "There was an error connecting to the stomp broker"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* Utility function to create a broadcaster action from a Stomp prophecy. |
||||||
|
* |
||||||
|
* @param StatefulStomp $stomp |
||||||
|
* Stomp instance or prophecy. |
||||||
|
* |
||||||
|
* @return \Drupal\islandora\Plugin\RulesAction\Broadcaster |
||||||
|
* Broadcaster, ready to test. |
||||||
|
*/ |
||||||
|
protected function createBroadcaster(StatefulStomp $stomp) { |
||||||
|
// Pull the plugin definition out of the plugin system. |
||||||
|
$actionManager = $this->container->get('plugin.manager.rules_action'); |
||||||
|
$definitions = $actionManager->getDefinitions(); |
||||||
|
$pluginDefinition = $definitions['islandora_broadcast']; |
||||||
|
|
||||||
|
$action = new Broadcaster( |
||||||
|
[], |
||||||
|
'islandora_broadcast', |
||||||
|
$pluginDefinition, |
||||||
|
$this->testQueue, |
||||||
|
$stomp |
||||||
|
); |
||||||
|
|
||||||
|
// Set the required contexts for the action to run. |
||||||
|
$action->setContextValue('message', "test"); |
||||||
|
$action->setContextValue('recipients', ['activemq:queue:foo', 'activemq:queue:bar']); |
||||||
|
|
||||||
|
return $action; |
||||||
|
} |
||||||
|
|
||||||
|
} |
Loading…
Reference in new issue