Python Asynchronous API wrapper for Rocket.Chat
- From GitHub:
Clone our repository and
python3 setup.py install
import asyncio
from rocketchat_API.rocketchat import RocketChat
async def main():
rocket = RocketChat('user', 'pass', server_url='server_url')
if rocket.login_task:
await rocket.login_task
print('Login successful!')
# Use the methods to test
response = await rocket.chat_post_message('test message using asynchronous library!', room_id='room_id')
print(response.json())
asyncio.run(main())
note: every method returns a requests Response object.
If you are going to make a couple of request, you can user connection pooling provided by requests
. This will save significant time by avoiding re-negotiation of TLS (SSL) with the chat server on each call.
import asyncio
import httpx
from rocketchat_API.rocketchat import RocketChat
async def main():
async with httpx.AsyncClient() as client:
rocket = RocketChat('user', 'pass', server_url='server_url', client=client)
if rocket.login_task:
await rocket.login_task
print(rocket.channels_list().json())
print(rocket.channels_history('GENERAL', count=5).json())
asyncio.run(main())
We are actively testing :)
Tests run on a Rocket.Chat Docker container so install Docker and docker-compose.
- To start test server do
docker-compose up
and to take test server downdocker-compose down
- To run the tests run
pytest
You can contribute by doing Pull Requests. (It may take a while to merge your code but if it's good it will be merged). Please, try to implement tests for all your code and use a PEP8 compliant code style.
Reporting bugs and asking for features is also contributing ;) Feel free to help us grow by registering issues.