| by Arround The Web | No comments

How to Process Data in Real-Time with MongoDB Change Streams

You may use a feature called MongoDB Change Streams to get instantaneous alerts when your MongoDB databases and collections are modified. When using change streams, you can set up procedures that will be followed immediately each time a change event takes place. Change streams are built using he MongoDB oplog, which is a log of every modification made to a MongoDB distribution. The oplog has been modified to reflect changes made to documents stored in MongoDB. After changing streams, the oplog is read, and alerts are sent to programs that have registered with them.

Create Directories for Replicas

Let’s move toward the process of creating replicas in MongoDB. But for this, we will create 3 folders in the “RS” folder of Drive “C” named rs1, rs2, and rs3. All these 3 folders for replicas contain their “data” folders, where the data files for MongoDB replicas will be stored. The below-mentioned 3 commands are utilized to start MongoDB replica sets rs1, rs2, and rs3 one after another.

The –dbpath option specifies the folder where the MongoDB records will be kept, while the –logpath option defines the location of the MongoDB log file. The port 27018 of the MongoDB server will be open for connections. The replica set names, RS1, RS2, and RS3, are specified using the -replSet option. The storage engine that MongoDB will utilize, WiredTiger, is specified via the –storageEngine option. 27018 for rs1, 27019 for rs2, and 27020 for rs3 are the ports that the MongoDB server will listen on while accepting connections, respectively.

mongod --dbpath "C:\RS\rs1\data" --logpath "C:\RS\rs1\log\mongod.log" -replSet rs1 --storageEngine=wiredTiger --port 27018

mongod --dbpath "C:\RS\rs2\data" --logpath "C:\RS\rs2\log\mongod.log" -replSet rs1 --storageEngine=wiredTiger --port 27019

mongod --dbpath "C:\RS\rs3\data" --logpath "C:\RS\rs3\log\mongod.log" -replSet rs1 --storageEngine=wiredTiger --port 27020

Start Mongo Shell for First Replica

Navigate to the “bin” folder of the MongoDB shell and run the Command Prompt from there. Then, run MongoDB “mongosh” exe file using the below instruction, which has been using the port for 1st replica i.e. 27018 for rs1.

Mongosh –port 27018

Now, the shell for replica rs1 has been launched.

Mongosh –port 27018

Define Replicas

A collection of synced MongoDB servers is known as a replica set. As a result, even if one of the servers fails, the data in the replica set remains constantly consistent. The query below initially specifies the replica set’s name as rs1. The replica set’s members are then defined. A hostname and an ID are used to identify each member. IDs in this instance are 0, 1, and 2. The members in this instance are localhost: 27018, 27019, and 27020. The MongoDB server will build the replica set and begin syncing the data amongst the members as soon as this query got executed.

config={_id:"rs1",members:[{_id:0,host:"localhost:27018"},{_id:1,host:"localhost:27019"},{_id:2,host:"localhost:27020"}]}

Initiate Configuration

The initiate function command has been utilized to configure the configuration in the above illustration for creating and connecting replicas.

rs.initiate(config)

Display Status

After initiating the configuration of replicas, the “status” instruction can be used to display the current working state of each replica server. While using this query, the “rs1” server has been portrayed as the secondary replica as per the “direct: secondary” keyword.

Rs.status()

The output of the status instruction has been displaying that the localhost 27018 has been set to “Primary” now.

On the contrary, the localhosts 2019 and 27020 are specified as secondary replicas.

Insert Data via Primary Server

When you see the next command area, it will display the “rs1” as “direct: primary”. The “show dbs” instruction has been displaying the built-in databases of a primary server.

Show dbs

Create a new “HositalDB” database via the “use” instruction.

Use HospitalDB

After that, insert a collection on your primary server using the insertOne() function.

Db.patients.insertOne({ID: 1001, Name: “James”})

Fetch Data in Secondary Server

Run the MongoDB shell for the secondary server, which is located at localhost 27019, via the command prompt.

Mongosh –port 27019

The database created in a primary server can be seen and used in this secondary server.

Show dbs

Use HospitalDB

Right now, the secondary server can’t read and write any record in the database i.e. as per the below outputs of “insertOne” and “find” function queries.

Db.patients.insertOne({ID: 1002, Name: “Christina”})

Db.patients.find()

Let’s set the status of this server to “primary” to have read and write privileges. For this, use the “setReadPref()” function with the argument “primaryPreferred.”

Db.getMongo().setReadPref(‘primaryPreferred’)

After using the above instruction, you can search for the inserted documents.

Db.patients.find()

Process Data in RealTime with Change Streams

The concept of change streams works in real time when you perform any operation on the primary server. For instance, we have been inserting another record in the “patients” collection using the insertOne() function in our primary server.

Db.patients.insertOne({ID: 1003, Name: “Tom”})

The Python program establishes a connection to a MongoDB database and uses change streams to monitor the patient’s collection for modifications continuously. The script first loads the MongoClient class from the Pymongo package for connecting to a MongoDB database. Every time a fresh record is added to the patient’s collection, the process_insert function is invoked. The main function initially establishes a connection to the local host port 27018 of the MongoDB database.

Then, it obtains the patient collection and the HospitalDB database. The process then produces a change stream for the group of patients. Since the change stream is set up with a blank pipeline, every modification to the collection will result in an emission. The try block initiates the change stream and cycles through the modifications. The program determines if the operation type is “insert” for each change. The application then uses the process_insert method to output the record to the console if it is.

Any errors that arise are caught and printed to the console by the except block. The change stream is finished with the “finally” block. This program will be called as a script, thanks to the “if __name__ == “__main__”:” declaration.

from pymongo import MongoClient
def process_insert(change):
    print("Inserted document:", change["fullDocument"])
def main():
    client = MongoClient("mongodb://localhost:27018")
    db = client["HospitalDB"]     collection = db["patients"]     pipeline = []  # You can add aggregation stages to filter and process changes
    change_stream = collection.watch(pipeline=pipeline)
    try:
        for change in change_stream:
            if change["operationType"] == "insert":
                process_insert(change)
    except Exception as e:
        print("An error occurred:", e)
    finally:
        change_stream.close()
if __name__ == "__main__":
    main()

Execute the Python code file via the “python” command in Command Prompt. Every time a primary server user inserts data, it will be displayed on this CMD in real-time.

Python Streams.py

Conclusion

This guide has been illustrating every step to process data in real with the use of MongoDB Change Streams. We have started by configuring replica servers to insert records in the primary server and fetching them in the secondary replica servers. In the end, we used the Python stream code to perform real-time processing with Change Streams.

Share Button

Source: linuxhint.com

Leave a Reply