Apache NiFi supports powerful and scalable directed graphs of data routing,
transformation, and system mediation logic. When paired with the CData
JDBC Driver for Salesforce, NiFi can work with live Salesforce data. This article shows
how to read data from a CSV file and perform batch operations (INSERT/UPDATE/DELETE)
using the CData JDBC Driver for Salesforce data in Apache NiFi (version 1.9.0 or later).
With built-in optimized data processing, the CData JDBC Driver offers
unmatched performance for interacting with live Salesforce data. When you issue
complex SQL queries to Salesforce, the driver pushes supported SQL operations,
like filters and aggregations, directly to Salesforce and utilizes the embedded
SQL engine to process unsupported operations client-side (often SQL functions
and JOIN operations). Its built-in dynamic metadata querying allows you to work
with and analyze Salesforce data using native data types.
About Salesforce Data Integration
Accessing and integrating live data from Salesforce has never been easier with CData. Customers rely on CData connectivity to:
- Access to custom entities and fields means Salesforce users get access to all of Salesforce.
- Create atomic and batch update operations.
- Read, write, update, and delete their Salesforce data.
- Leverage the latest Salesforce features and functionalities with support for SOAP API versions 30.0.
- See improved performance based on SOQL support to push complex queries down to Salesforce servers.
- Use SQL stored procedures to perform actions like creating, retrieving, aborting, and deleting jobs, uploading and downloading attachments and documents, and more.
Users frequently integrate Salesforce data with:
- other ERPs, marketing automation, HCMs, and more.
- preferred data tools like Power BI, Tableau, Looker, and more.
- databases and data warehouses.
For more information on how CData solutions work with Salesforce, check out our Salesforce integration page.
Getting Started
Generate a JDBC URL
We need a JDBC URL to connect to Salesforce data from Apachi NiFi.
Built-in Connection String Designer
For assistance in constructing the JDBC URL, use the connection string designer built into the Salesforce JDBC Driver. Either double-click the JAR file or execute the jar file from the command-line.
java -jar cdata.jdbc.salesforce.jar
Fill in the connection properties and copy the connection string to the clipboard.
There are several authentication methods available for connecting to Salesforce: OAuth, Login (or basic), and SSO. The Login method requires you to have the username, password, and security token of the user.
OAuth Authentication (default)
The default authentication mechanism (and the one preferred by Salesforce) is OAuth. To use OAuth with CData's embedded OAuth application, leave the connection properties blank. If you have configured your own custom OAuth application with Salesforce (see the Help documentation for more information), set OAuthClientId, OAuthClientSecret, and CallbackURL to the properties for you application. Set InitiateOAuth to the desired OAuth flow ("GETANDREFRESH" will have the connector manage the entire OAuth flow).
Login (or Basic) Authentication
If you do not wish do not wish to use OAuth authentication, you can use Login (or basic) authentication. Set AuthScheme to Basic, and set the User, Password, and SecurityToken properties. You can configure your security token in Salesforce.
SSO (single sign-on) Authentication
SSO (single sign-on) can be used by setting the SSOProperties, SSOLoginUrl, and SSOExchangeURL connection properties, which allow you to authenticate to an identity provider. See the "Getting Started" chapter in the Help documentation for more information.
Multi-Factor Authentication (MFA)
If your Salesforce org has MFA enforcement enabled, set MFACode to the time-based one-time passcode (TOTP) generated by your authenticator app (such as Salesforce Authenticator or Google Authenticator). MFACode applies to both OAuth and Login authentication flows.
π Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)
Batch Operations (INSERT/UPDATE/DELETE) in Apache NiFi
The sample flow presented below is based on the following NiFi Processors:
- ListFile - Retrieves a listing of files from the local filesystem and creates a FlowFile for each retrieved file.
- FetchFile - Reads the content of the FlowFile received from the ListFile processor.
- PutDatabaseRecord - Uses a specified RecordReader to input records from a flow file coming from the FetchFile processor. These records are translated to SQL statements and executed as a single transaction.
- LogAttribute - Emits attributes of the FlowFile at the specified log level.
This is what our finished product looks like:
π NiFi GUI
Disclaimers
1. The column names of the CSV file must match the column names of the data source table records to be inserted/updated/deleted.
2. Apache NiFi versions earlier than 1.9.0 do not support the Maximum Batch Size property in the PutDatabaseRecord processor.
Configurations
In order to perform batch INSERT, UPDATE or DELETE, the NiFi Processors should be configured similar to the following:
-
Configure the ListFile processor: Set the Input Directory property to the local folder path from where to pull the CSV files.
Set the File Filter property to a regular expression to pick up only the files whose names match the expression.
i.e., if the CSV file's full path is C:\Users\Public\Documents\InsertNiFi.csv, the properties should be configured like in the following image:
π config for ListFile
- Configure the FetchFile processor
Leave the FetchFile processor's property configurations to their default values:
π config for FetchFile
- Configure the PutDatabaseRecord processor
INSERT operation
Configure the PutDatabaseRecord processor similar to the following in order to perform Batch INSERT Operations:
π config for PutDatabaseRecord-INSERT
π config for PutDatabaseRecord-INSERT
- Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
- Set the Statement Type property to INSERT.
-
Set the Database Connection Pooling Service to the DBCPConnection Pool that
holds the driver configuration. Please note that the driver should be configured
to use Bulk API.
| Property |
Value |
| Database Connection URL |
jdbc:salesforce:InitiateOAuth=GETANDREFRESH;MFACode=YourMFACode
|
| Database Driver Class Name |
cdata.jdbc.salesforce.SalesforceDriver |
- Set the Catalog Name property to the name of the catalog that your table is part of.
- Set the Schema Name property to the name of the schema that your table is part of.
- Set the Table Name property to the name of the table that you want to INSERT into.
-
Set the Maximum Batch Size property to the maximum number of records that you want to be included in a single batch.
π config for PutDatabaseRecord-INSERT
UPDATE Operation
Configure the PutDatabaseRecord processor similar to the following in order to perform Batch UPDATE Operations:
-
Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
π config for PutDatabaseRecord-INSERT
π config for PutDatabaseRecord-INSERT
- Set the Statement Type property to UPDATE.
-
Set the Database Connection Pooling Service to the DBCPConnection Pool
that holds the driver configuration. Please note that the driver should
be configured to use Bulk API. Use the same Database Connection URL format as seen above.
- Set the Catalog Name property to the name of the catalog that your table is part of.
- Set the Schema Name property to the name of the schema that your table is part of.
- Set the Table Name property to the name of the table that you want to UPDATE.
- Set the Update Keys property to the name of the columns that are required for an UPDATE.
-
Set the Maximum Batch Size property to the maximum number of records that you want to be included in a single batch.
π config for PutDatabaseRecord-UPDATE
DELETE Operation
Configure the PutDatabaseRecord processor similar to the following in order to perform Batch DELETE Operations:
-
Set the Record Reader property to a CSV Reader Controller Service. Configure the CSV Reader Controller Service to match the format of your CSV file.
π config for PutDatabaseRecord-UPDATE
π config for PutDatabaseRecord-UPDATE
- Set the Statement Type property to DELETE.
-
Set the Database Connection Pooling Service to the DBCPConnection Pool
that holds the driver configuration. Please note that the driver should be
configured to use Bulk API. Use the same Database Connection URL format as seen above.
- Set the Catalog Name property to the name of the catalog that your table is part of.
- Set the Schema Name property to the name of the schema that your table is part of.
- Set the Table Name property to the name of the table that you want to UPDATE.
-
In comparison to INSERT and UPDATE statement types, the DELETE operation does not
expose a Maximum Batch Size property. However, the operations are still
processed in batches. If not changed, the maximum number of records per batch
is 2000, adhering to the default value. In order to change the value of the
Maximum Batch Size used for DELETE operations, change the statement
type to INSERT or UPDATE, then change the value of the Maximum Batch Size property,
and click Apply Changes. Finally, reopen the processor's configuration, change
the Statement Type back to DELETE, and click Apply Changes.
π config for PutDatabaseRecord-DELETE
-
Configure the LogAttribute processor
Finally, configure the LogAttribute processor by specifying the Attributes
that you would like to log or ignore, as well as the log level, based on your
use case.
Free Trial & More Information
Download a free, 30-day trial of the CData
JDBC Driver for Salesforce and start working with your live Salesforce data in Apache NiFi. Reach out to our
Support Team if you have any questions.