To use Data Exchange for a transformation with Kafka as the target, you need to have a pre-defined XSD schema structure to represent the Kafka message structure. In some cases, you may want to have the same XSD schema structure for the target Kafka as your source schema. For such cases, Data Exchange provides a function in DDW to generate an XSD schema based on either an Enterprise Database Server schema or an RDMS schema.
When the transformation target is Kafka, Data Exchange acts as a Kafka producer. After the data is transformed to Kafka as messages, you can create your own consumer application to consume the messages. To consume the messages, the consumer program needs to de-serialize it. Data Exchange provides another function in DDW to generate helper classes. You can use these classes in your consumer program to de-serialize the messages. The classes are generated either in Java or C#, based on the programing language of the consumer. When these helper classes are generated, a generic helper class for the Data Exchange messages in general and specific helper classes for the individual tables are generated.
To generate an XSD file in DDW,
On the Schema View, select the required schema.
Right-click the schema and select Export Schema.
The Export Schema window appears.
Enter or edit the output path for the generated XSD file.
Alternatively, click Browse, and select or create an output path.
Enter or edit the name of the XSD file.
From the Save As Type drop-down list, select “XSD.”
Select Load Schema if you want to load the XSD file to DDW after it is generated.
Click OK.
You can now define a transformation from the Enterprise Database Server schema or the RDMS schema to the generated Kafka schema using the auto map option.
Note: DDW generates an XSD file with independent XSD elements for the table and its versions available in the RDMS schema.
Kafka Message Format
For efficient generation of Kafka messages in Data Exchange and its use in consumer, Data Exchange uses a unified Kafka message format. This unified Kafka message format allows messages that use different transmission options to be represented in a single format. The message in the Kafka queue consists of two parts; key and message.
The message value in the Kafka queue is the Data Exchange Kafka message, and the key is BatchId. The Kafka message includes metadata such as TransformationId, SourceOperation, TargetOperation, and so on, that provides information regarding the transformation. It also contains the transformed data in BeforeImage and AfterImage. This provides information on the data that is updated, based on which further processing can be done when the Consumer obtains the data from the queue.
The message format includes a message header, followed by BatchUnits that consists of several BatchUnit, which in turn consists of multiple MessageUnit.
Note: The value of the field DETransmissionType is always “Batch”.
The format of a Kafka message is as follows:
<DEKafkaMessage> <Header> <BatchId></BatchId> <ThreadId></ThreadId> <MessageUniqueId></MessageUniqueId> <MessageType></MessageType> <DETransmissionType>Batch</DETransmissionType> <TransformationId> </TransformationId> <ProviderId></ProviderId> <MessageFormatVersion></MessageFormatVersion> </Header> <BatchUnits> <BatchUnit>* <SourceTransactionId></SourceTransactionId># <MessageUnits> <MessageUnit>* <SourceOperation></SourceOperation> <TargetOperation></TargetOperation> <OrderInBatchUnit></OrderInBatchUnit> <SchemaName></SchemaName> <ClassifierName></ClassifierName> <KeysOfTarget> <KeyOfTarget></KeyOfTarget>* </KeysOfTarget> <SchemaLastModifiedDate></SchemaLastModifiedDate> <Schema /> <MessagePayload> <BeforeImage></BeforeImage> <AfterImage></AfterImage> </MessagePayload> </MessageUnit> </MessageUnits> </BatchUnit> </BatchUnits> </DEKafkaMessage>
* can have multiple elements
# present on a CDT, SourceTransactionId represents an identifier for the transaction in the database audit trail. Schema is present for both CDT and BDT but is always empty.
Table G.1. Kafka Message Header
Field | C# Type | Description |
BatchId | Long | BatchId is the key of the Kafka message. If a batch is large, then it is send as separate Kafka messages. All messages of the same batch have the same BatchId and are sent to the same partition. This is used when a long transaction is detected. For a CDT, this increases by 1 for each message, except when there is a long transaction. For the long transaction spanned in multiple messages, the BatchId is the same while there is an increase in OffsetInBatch. So all the messages go to the same partition. But the MessageUniqueId can still be unique. For a BDT, this increases by one every time we start loading another source table. For the records in the same table, it is handled similar to a long transaction in CDT. The BatchId remains the same but the OffsetInBatch increases. |
ThreadId | Int | For a CDT there is only one commit thread. So the value of ThreadId for CDT is always zero. For a BDT you can configure multiple processes to commit records to the target system, based on the setting ‘Maximum Degree of Parallelization’. ThreadId is ID of the committing thread. |
MessageUniqueId | String | This value uniquely identifies a Kafka message. It is in the format {BatchId}-{ThreadId}-{OffsetInBatch}, to ensure that it is unique for each message. |
MessageType | String | If the batch is too large to be sent as a single Kafka message, then MessageType can be BeginBatch, BatchContent, or EndBatch. These values indicate the batch scope. |
DETransmissionType | String | This is always set to Batch. |
TransformationId | GUID | This value indicates the transformation that generates the message. |
ProviderID | String | This value indicates the Data Exchange Runtime Service instance that sends the message. Note: This value is generated automatically when initializing the tracking database, and cannot be changed. |
MessageFormatVersion | String | This is the version of the Data Exchange Kafka Message. Using this version, the consumer can check the compatibility of the Data Exchange Kafka messages. |
BatchUnits
The Batch mode consists of multiple transactions that can be configured in the DataExchangeRuntimeService.exe.config. There are two configuration items:
BDTTargetBatchSize — the number of records in a BDT batch.
CDTTargetBatchSize — the number of transactions in a batch when running CDT (a transaction may have more than one MessageUnit if the transaction performs multiple database changes in the transaction). If the number of updates or message units exceed the LongTransactionFragmentSize then a new message is generated. This can be used to control the maximum size of the messages generated by Data Exchange.
BatchUnit
A BatchUnit is aligned to the source transaction.
Field | Type | Description |
SourceTransactionId | String | For OS 2200 this links to the transaction in the audit trail. All database changes for this transaction ID will have the same transaction ID. For a CDT, it is a unique identifier of the source transaction. For OS 2200 as the source, it is the Step ID (18 hex number) + Host ID (white space or A to L) + long transaction parcel ID (3 digits). This is not applicable for a BDT, as there is no audit trail associated with BDT. |
MessageUnit
MessageUnit is the smallest unit in a Data Exchange Kafka message. It contains an audit entry or the data of one source record/database change. There can be more than one MessageUnit in a BatchUnit.
Field | Type | Description |
SourceOperation | String | For a BDT the default is INSERT. For a CDT the default is MERGE (for simple transformations). Values can be INSERT, UPDATE, DELETE, MERGE, DELETEALL. |
TargetOperation | String | This is the same as the SourceOperation unless the transformation changes the operation. |
OrderInBatchUnit | Int | This is used to show the sequence of the MessageUnit in one BatchUnit. |
SchemaName | String | This is the GUID for the schema (as stored in the tracking database) and the name of the schema. |
ClassifierName | String | The name of the source table. |
KeysOfTarget | XML Array of elements | These are the column names that identify the record for the
operation. Every Message Unit contains the KeysOfTarget. However,
it will be empty in the following circumstances:
Note: This is only in the case of RDMS. For Enterprise Database Server, the BeforeImage and AfterImage will have all the fields. |
SchemaLastModifiedDate | DateTime | This is the date when the schema was last changed. |
Schema | String | This is always empty for both BDT and CDT. |
BeforeImage | String | JSON formatted, not present if there is a DELETE <table> ALL command which deletes the entire table. |
AfterImage | String | JSON formatted, not present if there is a DELETE <table> ALL command which deletes the entire table. |