IPTP Developer

Developer

  • Getting started
    • Docker
    • MarkDown
    • Merge
  • ESB
    • Configuration
  • XM
    • API
    • Chats
    • ERP Integration
    • Messages
    • Sync
  • IPTP ID
    • Integration
  • Ugene
    • API
    • Error codes
    • How create plugin
  • Live assistance api
    • API

ESB Configuration

Summary

  • ESB is library which is used to synchronize requests from app to another apps.
  • Request format can be XML or JSON
  • Each app will have a separate database (PostgreSQL an MySQL) to store requests.
  • Informations about database, server nodes is written on esb2.json

Configuration file

{
    "localId":"esbhost_ams",
    "port":"2101",
    "database": {
    	"driver":"org.postgresql.Driver",
    	"connection":"jdbc:postgresql://vn-2.dev.iptp.net:9434/esb_ams",
    	"username":"postgres",
    	"password":"123456"
    },
    "isSlave":"false",
    "keyMapper":{"name":"esbhost_ams","host":"localhost","port":"1101"},
    "nodes":[
		{"name":"esbhost_hk","host":"localhost","port":"3101"},
		{"name":"esbhost_la","host":"localhost","port":"4101"}
    ]
}

esb2.json

{
    "localId":"esbhost_ams",
    "port":"2101",
    "database": {
    	"driver":"org.postgresql.Driver",
    	"connection":"jdbc:postgresql://vn-2.dev.iptp.net:9434/esb_ams",
    	"username":"postgres",
    	"password":"123456"
    },
    "isSlave":"false",
    "active":"true",
    "keyMapper":{"name":"esbhost_ams","host":"localhost","port":"1101"},
    "nodes":[
		{"name":"esbhost_hk","host":"localhost","port":"3101"},
		{"name":"esbhost_la","host":"localhost","port":"4101"}
    ]
}

  • localId , port : Current server name and websocket port
  • database : We can set driver, connection string for PostgreSQL or MySQL
  • isSlave : Node is Master or Slave. false:master / true:slave
  • keyMapper
    • While synchronization is made in master-master mode (i.e. request can come from any server in the cluster), there is still one central node, the keyMapper (specified in esb2.json), which is responsible for generating the next ( id, queue) combination for all transactions.
    • This ensures that we don’t have any duplicate keys or breaks in sequence.
  • nodes : information about servers where the request will be sent.

Configuration setup

  • Configuration file is searched for in the following location: $CATALINA_HOME/conf/esb2.json
  • If CATALINA_HOME is not initialized, configuration file is searched in /conf/esb2.json
  • If the configuration file is not found, the ESB will work in local mode: i.e. transactions will not be stored in the database, and will not be synchronized anywhere, ESB will just exist as a placeholder.

Database Structure

SQL for PostgreSQL database creation

--esb-db
create database esb2;
\c esb2
--the name of the sequence must be the same as the name of the queue that we will be using for the current app.
--if there are more than one application using this esb database, you must create same number of sequences as number of apps, with every app have unique sequence/queue name.
create sequence IPTP;
create table transactions (
	id integer not null,
	queue varchar(30) not null,
	trans_date timestamp without time zone default now(),
	request text,
	response text,
	source varchar(30),
	status integer,
	key varchar(200),
	primary key (id, queue)
);

SQL for MySQL database creation

SET GLOBAL log_bin_trust_function_creators = 1;

DROP TABLE IF EXISTS `sequence_data`;
CREATE TABLE `sequence_data` (
  `sequence_name` varchar(100) NOT NULL,
  `sequence_increment` int unsigned NOT NULL DEFAULT '1',
  `sequence_min_value` int unsigned NOT NULL DEFAULT '1',
  `sequence_max_value` bigint unsigned NOT NULL DEFAULT '18446744073709551615',
  `sequence_cur_value` bigint unsigned DEFAULT '1',
  `sequence_cycle` tinyint(1) NOT NULL DEFAULT '0',
  PRIMARY KEY (`sequence_name`)
) ;

INSERT INTO sequence_data (sequence_name) VALUE ('IPTP');

DROP TABLE IF EXISTS `transactions`;
create table `transactions` (
	`id` integer not null,
	`queue` varchar(30) not null,
	`trans_date` timestamp ,
	`request` text,
	`response` text,
	`source` varchar(30),
	`status` integer,
	`key` varchar(200),
	primary key (id, queue)
);

delimiter //
DROP FUNCTION IF EXISTS `nextval`;

delimiter //
CREATE FUNCTION `nextval` (`seq_name` varchar(100))
RETURNS bigint(20) NOT DETERMINISTIC
BEGIN
    DECLARE cur_val bigint(20);
    SELECT sequence_cur_value INTO cur_val
    FROM  sequence_data
    WHERE sequence_name = seq_name ;
 
    IF cur_val IS NOT NULL THEN
        UPDATE sequence_data
        SET
            sequence_cur_value = IF (
                (sequence_cur_value + sequence_increment) > sequence_max_value,
                IF (
                    sequence_cycle = TRUE,
                    sequence_min_value,
                    NULL
                ),
                sequence_cur_value + sequence_increment
            )
        WHERE
            sequence_name = seq_name
        ;
    END IF;
 
    RETURN cur_val;
END;

delimiter //
DROP FUNCTION IF EXISTS `currval`;

delimiter //
CREATE FUNCTION `currval`(`seq_name` varchar(100))
RETURNS bigint(20) NOT DETERMINISTIC
BEGIN
    DECLARE cur_val bigint(20);
 
    SELECT
        sequence_cur_value INTO cur_val
    FROM
        sequence_data
    WHERE
        sequence_name = seq_name;
 
    RETURN cur_val;
END;

delimiter //
DROP FUNCTION IF EXISTS `setval`;;

delimiter //
CREATE FUNCTION `setval` (`seq_name` varchar(100), `new_val` bigint(20))
RETURNS bigint(20) NOT DETERMINISTIC
BEGIN
    UPDATE
		sequence_data
	SET
		sequence_cur_value = new_val
    WHERE
        sequence_name = seq_name
    ;
 
    RETURN new_val;
END;

Primary key: (id, queue)

  • id - nextval() from sequence.
  • queue - varchar value, identifier for which app made the record in the database.
  • trans_date - date we received the synchronization request
  • request - xml/Json request that will be sent to the servers for sync.
  • response - xml/Json response given by that server after request was finished
  • source - name of the node from which we got this request (localId from esb2.json on that source server)
  • status - status of transaction. 2 means it is completed we can move on to the next one, 1 means still in queue.
  • key - Randomly generated by ESB on the server that got the initial request, always in the format XXXX-XXXX-XXXX-XXXX, letters and numbers

Internal synchronization process

  • ESB Listener receives a request in XML/JSON format from the app.

  • sync attribute

    • This boolean attribute at the root element of the XML/JSON lets us know where the request came from.
    • TRUE : This means that the request has already gone through the synchronization process and is being forwarded by the ESB library, and that nothing more needs to be done.
    • FALSE : This means that we have an original request from the app itself, and that we should proceed with synchronization.
    • This attribute should normally not be set by the app itself. ESB will set this attribute.
    • if the app sent the request, then the ESB will process the transaction and set it to TRUE before forwarding the request to all the nodes.
    • However, the app needs to check for its existence:
      • if it is FALSE, and we need to synchronize, we can call the library.
      • If it is TRUE, we can ignore that step and simply carry on with the implementation of the function.
  • Sending data to nodes

    1. ESB connects to the keyMapper node and receives the next id in the required sequence.
    2. “sync” attribute is set to TRUE
    3. XML/JSON request and required parameters (key, queue, source node) are packed into a Packet, which is sent to all nodes in specified in configuration file
    4. Request information is saved to the local ESB database, as well as the response from the app on the current server.
    5. Response from app is returned.

Configuring Java app for use with ESB Library

  • App must have a class that implements the ConnectionListener interface.

    • net.iptp.ESB.ESBConnectionListener –> for XML request

    • net.iptp.ESB.ESBJsonConnectionListener –> for JSON request

    • This interface itself has only one function onRequest

    • The purpose of this function is to be called at the moment that a request that has passed the synchronization library is sent to a node.

    • For example, ERP app uses this interface as a Request Dispatcher: the function is used to catch the XML request, do some validation and session checking, and then if everything is OK call the required function on the server.

  • Starting the Library:

    • Get Static instance of net.iptp.blockchain.ESB class

    • Call setListener function setListener(String queue, ESBConnectionListener listener) for XML setJsonListener(String queue, ESBJsonConnectionListener listener) for JSON

    • The first parameter is the name of the queue that we are using for the app (see database configuration step)

    • The second is the custom implementation of the ESBConnectionListener from the previous step.

  • Sending request via ESB library

    • Get a static instance of net.iptp.blockchain.ESB
    • Call setListener function sync(String queue, org.wc3.DOM.Document request) for XML sync(String queue, String jsonRequest) for JSON
    • The queue parameter is the name of the queue that we are using for the app (same as for setListener function)
    • The function will return either another response or null.
      • If the function returns NULL (i.e we have not gotten a response from the library), this means that the request has been sent to the queue, but the queue is bust processing previous requests and has not been able to catch the response in the time allocated. In this case, it is enough to wait a little bit - we will not get a response, but the transaction will be processed.
      • if the function returns something, this will be the XML response that was returned by the app
  • Order of functions

    • If you need synchronization in a function, you need to do the following structure by checking the value of “sync” parameter in the request ::
      • If it is FALSE → not sync , then we need to call the sync() function of the ESB
      • If the value is TRUE, this means that the request already came from the ESB library, in which case we skip the previous step and proceed with function implementation.

Example

Json Request

ESB esb = new ESB("conf/esb2.json");

esb.setJsonListener("IPTP", new ESBJsonConnectionListener() {
    @Override
    public void onRequest(String jsonRequest, String jsonResponse) {
    
    }
});

esb.sync("IPTP", "{\"object\":\"abc\":}");

or

ESB esb = ESB.getInstance();

esb.setJsonListener("IPTP", new ESBJsonConnectionListener() {
    @Override
    public void onRequest(String jsonRequest, String jsonResponse) {
    
    }
});

esb.sync("IPTP", "{\"object\":\"abc\":}");

Check sync

void syncData(String request, String target)
{
    JsonObject json = JSONHelper.parse(request);
    JsonElement sync = json.get("sync");
    if (sync == null || sync.getAsBoolean() == false) {
		String res = net.iptp.blockchain.ESB.getInstance().sync(target, request);
    }
}

XML-Base Request

ESB esb = new ESB("conf/esb2.json");

esb.setListener("IPTP", new ESBConnectionListener() {
    @Override
    public void onRequest(Document request, Document response) {

    }
});

esb.sync("IPTP", XMLHelper.CreateDocument());

or

ESB esb = ESB.getInstance();

esb.setListener("IPTP", new ESBConnectionListener() {
    @Override
    public void onRequest(Document request, Document response) {

    }
});

esb.sync("IPTP", XMLHelper.CreateDocument());

Check sync

void syncData(Document request, String target) 
{
    String sync = XMLHelper.evaluateXPathNode("/request",request).getAttribute("sync");
    if (sync != null && sync.equals("false")) {
       Document res = net.iptp.blockchain.ESB.getInstance().sync(target, request);
    }
}