Avatar

Please consider registering
guest

sp_LogInOut Log In sp_Registration Register

Register | Lost password?
Advanced Search

— Forum Scope —




— Match —





— Forum Options —





Minimum search word length is 3 characters - maximum search word length is 84 characters

sp_Feed Topic RSS sp_TopicIcon
PubSub Configuration
February 16, 2022
15:18, EET
Avatar
pradeep_patel
Member
Members
Forum Posts: 19
Member Since:
July 13, 2021
sp_UserOfflineSmall Offline

Hi,
Consider scenario where I want to configure multiple writers for the same transport setting or broker. Should I need to instantiate multiple pubsub configuration per writer or is there a way to pass list of writers (from the same publisher) to pubsub configuration ( pointing to some broker). How should we model our Subscriber client (in context to pubsub configuration using prosys sdk) for supporting multiple writerIDs from same Publisher publishing network messages to same broker.

February 16, 2022
17:22, EET
Avatar
Bjarne Boström
Moderator
Moderators
Forum Posts: 983
Member Since:
April 3, 2012
sp_UserOfflineSmall Offline

Hi,

Can you clarify that a bit more?

Like, in “code terms”, “yes”, you can add more than one PubSubXXXDataSetReaderConf to PubSubXXXReaderGroupConf and multiple of those to PubSubXXXConnectionConf and multiple of those to PubSubSystemConf (via Builders as the XXXConf are immutable, but it was easier to say it like that). However, I’m not sure is this what you are asking?

Also, sorry, but we still have mostly tested the “one of each” (one connection, group and reader/writer) with the samples. I can say that we are currently finally working on making some configuration file support and a more general way to handle the configuration objects (a bit like we have StructureSpecification for Structures). Once this is in place it is also a lot easier for ourselves to test more complex configurations.

Also, we have at least one known issue for MQTT, pretty much having a different queuenames wihtin a group or connection probably wont work as expected, preferably that too will be fixed in the next release.

February 18, 2022
9:09, EET
Avatar
pradeep_patel
Member
Members
Forum Posts: 19
Member Since:
July 13, 2021
sp_UserOfflineSmall Offline

Okay thanks Bjarne Bostrum I will try out what you have suggested in your reply, just to clarify what I am looking for is that —

One PubSub connection can have a number of writer groups/ writerIDs or Reader groups/reader IDs. If I want to pass the writer Ids belonging to a Writer Group as list is that possible ? i.e. in initWriterGroup method we are setting writer id as single value

builder.setName(SAMPLE_WRITER_GROUP_NAME);
builder.setWriterGroupId(SAMPLE_WRITER_GROUP_ID); // If I want to pass list of writer ids then what would be the best way ?

February 18, 2022
11:27, EET
Avatar
Bjarne Boström
Moderator
Moderators
Forum Posts: 983
Member Since:
April 3, 2012
sp_UserOfflineSmall Offline

No; I said system can have any number of connections, that can have any number of groups, that can have any number of readers/writers. Each level has a name and an ID. The WriterGroup doesn’t have “WriterIds”, it has writers, that have ids. The group also has it’s own ID. This is to say the builder.setWriterGroupId(SAMPLE_WRITER_GROUP_ID); is for setting the WriterGroup’s ID. This is “completely separate” from the Writer’s ID’s.

This is to say, you must add each writer via the groupBuilder.addOrReplaceWriter(..) and pass the whole Writer configuration object (which then has the ID for the Writer).

I hope this doesn’t confuse you more, that part of the sample is in the current development tree maybe a bit more clear, but note that you cannot use the following code “as-is” as it has changes not yet released (and might not, but hopefully they will in the next release), but instead the point is to convey the general idea:

  private void addVariableDataSetWriter(PubSubWriterGroupConf.Builder group) {
    PubSubDataSetWriterConf.Builder builder = group.getTransportProfile().createDataSetWriterConfBuilder();
    builder.setName(SAMPLE_VARIABLE_DATA_SET_WRITER_NAME);
    builder.setDataSetWriterId(SAMPLE_VARIABLE_DATASET_WRITER_ID);

    builder.setDataSetName(variableDataSet.getName());

    builder.setKeyFrameCount(UnsignedInteger.valueOf(SAMPLE_KEYFRAME_COUNT));

    if (builder instanceof PubSubUadpDataSetWriterConf.Builder) {
      initUadpDataSetWriter((PubSubUadpDataSetWriterConf.Builder) builder);
    }
    if (builder instanceof PubSubJsonDataSetWriterConf.Builder) {
      initJsonDataSetWriter((PubSubJsonDataSetWriterConf.Builder) builder, reversibleJson);
    }
    if (builder instanceof PubSubBrokerTransportSettingsConf.Builder) {
      initBrokerTransportSettings((PubSubBrokerTransportSettingsConf.Builder) builder, queueName, metadataQueueName);
    }
    group.addOrReplaceWriter(builder.build());
  }

Also note that is not yet 100% usable for adding more than one, since each writer in a group must have unique name. Note that the WriterId (the builder.setDataSetWriterId(SAMPLE_VARIABLE_DATASET_WRITER_ID);) for each writer must be unique for “the PublisherId” (https://reference.opcfoundation.org/v104/Core/docs/Part14/6.2.3/), so basically in a connection, unless more than one connection has the same publisherid (which might be what should be done, but this is a bit unclear to me at this time).

February 23, 2022
7:47, EET
Avatar
pradeep_patel
Member
Members
Forum Posts: 19
Member Since:
July 13, 2021
sp_UserOfflineSmall Offline

Another question is that once I invoke start() on pubsubSystem can I incrementally add reader/ writer to it or it just requires restart.

i.e before we start pubsubSystem we configure reader and reader group as below
mqttConnectionConf.addOrReplaceGroup(createJsonReaderGroup(SAMPLE_READER_GROUP_NAME, jsonReaderConf));
then we invoke pubSubSystem.start();

now if we want to use existing instance of pubsubSystem to add more readers/writers to it, is that possible efficiently ?

also I found updateConfiguration method on pubSubSystem, can we update reader/writer using pubSubSystem.updateConfiguration(arg0) ?

February 23, 2022
16:18, EET
Avatar
Bjarne Boström
Moderator
Moderators
Forum Posts: 983
Member Since:
April 3, 2012
sp_UserOfflineSmall Offline

In general the idea has been that it is update-able while it runs. Every configuration update happens via PubSubSystem.updateConfiguration.

As a sidenote, since you say specifically “updateConfiguration(arg0)” as in ‘arg0’, it would imply that you have not attached the javadocs, I would recommend to do so. If it would be attached, you should have said ” updateConfiguration(PubSubFunction changeOperation)” as in ‘changeOperation’, not ‘arg0’.

Anyway, you will pass in a function that will receive the existing live configuration as the input and then transform it to be the configuration the system should be. Internally things are added, removed or updated as the result.

Note that I did some fixes on this on 4.7.0, but it still is not perfect. You might (and more this in the earlier versions) need to do first remove via one updateConfiguration call and then add it back in a second one (update is separate from add+remove). Also, I should note that we have not implemented DataSetMetaData versioning yet, thus updating a DataSet might cause problems for subscribers made with the SDK. For MQTT-JSON it is probably less of a problem than for UDP-UADP and MQTT-UADP.

P.S.
The updateConfiguration-function-as-input design allows it to be edited from multiple Threads (once we have address-space based configuration some day this will be a thing most likely). A more traditional setConfiguration(conf) could cause an update operation to be missed if 2 Threads do getConfiguration+setConfiguration. Now one of them (the one who runs later) is certain to receive the other’s changes as the input (due to the synchronization only one of them runs at once, but due to thread schedulings if they would capture the configuration outside of the function they could both get the same initial configuration if the first one has not yet run).

March 15, 2022
16:53, EET
Avatar
pradeep_patel
Member
Members
Forum Posts: 19
Member Since:
July 13, 2021
sp_UserOfflineSmall Offline

Hi Bjarne Boström,
I tried below code and it worked. I wanted to add new writers along with the existing one so first I removed the connection and then added new writers as below.

// first added mqttjsonconnections with writers and then updated it after the pubsubsystem has started
pubSubSystem.start();
pubSubSystem.updateConfiguration(new PubSubFunction() {
@Override
public PubSubSystemConf apply(PubSubSystemConf conf) throws PubSubException {
return conf.withConnectionRemoved(String.valueOf(432));
}
});
pubSubSystem.updateConfiguration(new PubSubFunction() {
@Override
public PubSubSystemConf apply(PubSubSystemConf conf) throws PubSubException {
return conf.withConnectionAddedOrReplaced(createMqttJsonConnection(UnsignedShort.valueOf(433)));
}
});

Is this a right approach , also I have to add the old writers again that I have removed just to add the new writers and readers.

Please confirm on the approach.

March 16, 2022
13:30, EET
Avatar
Bjarne Boström
Moderator
Moderators
Forum Posts: 983
Member Since:
April 3, 2012
sp_UserOfflineSmall Offline

Hi,

For the time being, yes. Basically it can be though as a workaround-tier solution to a bug or “missing feature”, i.e. it is not intended to be forever like that. Once it works as intended (in some future version of the SDK; hopefully soon), you would need to only do one call that adds things.

While a bit out of scope, I’ll want to mention that I’m not sure does that compile as is, since “new PubSubFunction() {” misses the generics, i.e. it should be “new PubSubFunction_GENERICS_here() {“. NOTE! Actually most likely what happened is that the forum removed the angle brackets from your post, so this note only applies to others reading the code.

However, since PubSub editions are in the Java 8+ world, you can use lambdas, thus those would transform to the following one-liners, which look nicer:

pubSubSystem.updateConfiguration(conf -> conf.withConnectionRemoved(String.valueOf(432)));
pubSubSystem.updateConfiguration(
conf -> conf.withConnectionAddedOrReplaced(createMqttJsonConnection(UnsignedShort.valueOf(433))));

(If you have more than one statement you would need to use the block i.e. “{}” version of the lambda).

Forum Timezone: Europe/Helsinki

Most Users Ever Online: 518

Currently Online:
19 Guest(s)

Currently Browsing this Page:
1 Guest(s)

Top Posters:

hbrackel: 135

pramanj: 86

Francesco Zambon: 81

rocket science: 77

ibrahim: 75

Sabari: 62

kapsl: 57

gjevremovic: 49

Xavier: 43

fred: 41

Member Stats:

Guest Posters: 0

Members: 682

Moderators: 16

Admins: 1

Forum Stats:

Groups: 3

Forums: 15

Topics: 1467

Posts: 6259

Newest Members:

fidelduke938316, Jan-Pfizer, DavidROunc, fen.pang@woodside.com, aytule, rashadbrownrigg, christi10l, ahamad1, Flores Frederick, ellenmoss

Moderators: Jouni Aro: 1009, Otso Palonen: 32, Tuomas Hiltunen: 5, Pyry: 1, Petri: 0, Bjarne Boström: 983, Heikki Tahvanainen: 402, Jukka Asikainen: 1, moldzh08: 0, Jimmy Ni: 26, Teppo Uimonen: 21, Markus Johansson: 42, Niklas Nurminen: 0, Matti Siponen: 321, Lusetti: 0, Ari-Pekka Soikkeli: 5

Administrators: admin: 1