How to Process Data in Real-Time with MongoDB Change Streams
Create Directories for Replicas
Let’s move toward the process of creating replicas in MongoDB. But for this, we will create 3 folders in the “RS” folder of Drive “C” named rs1, rs2, and rs3. All these 3 folders for replicas contain their “data” folders, where the data files for MongoDB replicas will be stored. The below-mentioned 3 commands are utilized to start MongoDB replica sets rs1, rs2, and rs3 one after another.
The –dbpath option specifies the folder where the MongoDB records will be kept, while the –logpath option defines the location of the MongoDB log file. The port 27018 of the MongoDB server will be open for connections. The replica set names, RS1, RS2, and RS3, are specified using the -replSet option. The storage engine that MongoDB will utilize, WiredTiger, is specified via the –storageEngine option. 27018 for rs1, 27019 for rs2, and 27020 for rs3 are the ports that the MongoDB server will listen on while accepting connections, respectively.
Start Mongo Shell for First Replica
Navigate to the “bin” folder of the MongoDB shell and run the Command Prompt from there. Then, run MongoDB “mongosh” exe file using the below instruction, which has been using the port for 1st replica i.e. 27018 for rs1.
Now, the shell for replica rs1 has been launched.
Define Replicas
A collection of synced MongoDB servers is known as a replica set. As a result, even if one of the servers fails, the data in the replica set remains constantly consistent. The query below initially specifies the replica set’s name as rs1. The replica set’s members are then defined. A hostname and an ID are used to identify each member. IDs in this instance are 0, 1, and 2. The members in this instance are localhost: 27018, 27019, and 27020. The MongoDB server will build the replica set and begin syncing the data amongst the members as soon as this query got executed.
Initiate Configuration
The initiate function command has been utilized to configure the configuration in the above illustration for creating and connecting replicas.
Display Status
After initiating the configuration of replicas, the “status” instruction can be used to display the current working state of each replica server. While using this query, the “rs1” server has been portrayed as the secondary replica as per the “direct: secondary” keyword.
The output of the status instruction has been displaying that the localhost 27018 has been set to “Primary” now.
On the contrary, the localhosts 2019 and 27020 are specified as secondary replicas.
Insert Data via Primary Server
When you see the next command area, it will display the “rs1” as “direct: primary”. The “show dbs” instruction has been displaying the built-in databases of a primary server.
Create a new “HositalDB” database via the “use” instruction.
After that, insert a collection on your primary server using the insertOne() function.
Fetch Data in Secondary Server
Run the MongoDB shell for the secondary server, which is located at localhost 27019, via the command prompt.
The database created in a primary server can be seen and used in this secondary server.
Use HospitalDB
Right now, the secondary server can’t read and write any record in the database i.e. as per the below outputs of “insertOne” and “find” function queries.
Let’s set the status of this server to “primary” to have read and write privileges. For this, use the “setReadPref()” function with the argument “primaryPreferred.”
After using the above instruction, you can search for the inserted documents.
Process Data in RealTime with Change Streams
The concept of change streams works in real time when you perform any operation on the primary server. For instance, we have been inserting another record in the “patients” collection using the insertOne() function in our primary server.
The Python program establishes a connection to a MongoDB database and uses change streams to monitor the patient’s collection for modifications continuously. The script first loads the MongoClient class from the Pymongo package for connecting to a MongoDB database. Every time a fresh record is added to the patient’s collection, the process_insert function is invoked. The main function initially establishes a connection to the local host port 27018 of the MongoDB database.
Then, it obtains the patient collection and the HospitalDB database. The process then produces a change stream for the group of patients. Since the change stream is set up with a blank pipeline, every modification to the collection will result in an emission. The try block initiates the change stream and cycles through the modifications. The program determines if the operation type is “insert” for each change. The application then uses the process_insert method to output the record to the console if it is.
Any errors that arise are caught and printed to the console by the except block. The change stream is finished with the “finally” block. This program will be called as a script, thanks to the “if __name__ == “__main__”:” declaration.
def process_insert(change):
print("Inserted document:", change["fullDocument"])
def main():
client = MongoClient("mongodb://localhost:27018")
db = client["HospitalDB"] collection = db["patients"] pipeline = [] # You can add aggregation stages to filter and process changes
change_stream = collection.watch(pipeline=pipeline)
try:
for change in change_stream:
if change["operationType"] == "insert":
process_insert(change)
except Exception as e:
print("An error occurred:", e)
finally:
change_stream.close()
if __name__ == "__main__":
main()
Execute the Python code file via the “python” command in Command Prompt. Every time a primary server user inserts data, it will be displayed on this CMD in real-time.
Conclusion
This guide has been illustrating every step to process data in real with the use of MongoDB Change Streams. We have started by configuring replica servers to insert records in the primary server and fetching them in the secondary replica servers. In the end, we used the Python stream code to perform real-time processing with Change Streams.
Source: linuxhint.com