COM-Server Library API
Functions
com_server.all_ports()
def all_ports(**kwargs)
Gets all ports from serial interface.
Gets ports from Serial interface by calling serial.tools.list_ports.comports().
See here for more info.
Classes
com_server.Connection
Class that interfaces with the serial port.
Warning: Before making this object go out of scope, make sure to call disconnect() in order to avoid zombie threads.
If this does not happen, then the IO thread will still be running for an object that has already been deleted.
Connection.__init__()
def __init__(baud, port, *ports, exception=True, timeout=1, send_interval=1, queue_size=256, exit_on_disconnect=True, rest_cpu=True, **kwargs)
Initializes the Connection class.
baud, port (or a port within ports), timeout, and kwargs will be passed to pyserial.
For more information, see here.
Parameters:
baud(int): The baud rate of the serial connectionport(str): The serial port*ports: Alternative serial ports to choose if the first port does not work. The program will try the serial ports in order of arguments and will use the first one that works.timeout(float): How long the program should wait, in seconds, for serial data before exiting. By default 1.exception(bool): (DEPRECATED) Raise an exception when there is a user error in the methods rather than just returning. By default True.send_interval(float): Indicates how much time, in seconds, the program should wait before sending another message. Note that this does NOT mean that it will be able to send everysend_intervalseconds. It means that thesend()method will exit if the interval has not reachedsend_intervalseconds. NOT recommended to set to small values. By default 1.queue_size(int): The number of previous data that was received that the program should keep. Must be nonnegative. By default 256.exit_on_disconnect(bool): If True, sendsSIGTERMsignal to the main thread if the serial port is disconnected. Does NOT work on Windows. By default False.rest_cpu(bool): If True, will add 0.01 second delay to end of IO thread. Otherwise, removes those delays but will result in increased CPU usage. Not recommended to set False with the default IO thread. By default True.kwargs: Will be passed to pyserial.
Returns: nothing
Connection.__enter__()
def __enter__()
A context manager for the BaseConnection object.
When in a context manager, it will automatically connect itself to its serial port and returns itself.
Connection.__exit__()
def __exit__(exc_type, exc_value, exc_tb)
A context manager for the BaseConnection object.
When exiting from the context manager, it automatically closes itself and exits from the threads it had created.
Connection.connect()
def connect()
Begins connection to the serial port.
When called, initializes a serial instance if not initialized already. Also starts the IO thread.
Parameters: None
Returns: None
Connection.disconnect()
def disconnect()
Closes connection to the serial port.
When called, calls Serial.close() then makes the connection None.
If it is currently closed then just returns.
Forces the IO thread to close.
NOTE: This method should be called if the object will not be used anymore or before the object goes out of scope, as deleting the object without calling this will lead to stray threads.
Parameters: None
Returns: None
Connection.send()
def send(*args, check_type=True, ending='\r\n', concatenate=' ')
Sends data to the port
If the connection is open and the interval between sending is large enough,
then concatenates args with a space (or what was given in concatenate) in between them,
encodes to an utf-8 bytes object, adds a carriage return and a newline to the end (i.e. "\r\n") (or what was given as ending), then sends to the serial port.
Note that the data does not send immediately and instead will be added to a queue.
The queue size limit is 65536 byte objects. Anything more that is trying to be sent will not be added to the queue.
Sending data too rapidly (e.g. making send_interval too small, varies from computer to computer) is not recommended,
as the queue will get too large and the send data will get backed up and will be delayed,
since it takes a considerable amount of time for data to be sent through the serial port.
Additionally, parts of the send queue will be all sent together until it reaches 0.5 seconds,
which may end up with unexpected behavior in some programs.
To prevent these problems, either make the value of send_interval larger,
or add a delay within the main thread.
If the program has not waited long enough before sending, then the method will return false.
If check_type is True, then it will process each argument, then concatenate, encode, and send.
- If the argument is
bytesthen decodes tostr - If argument is
listordictthen passes throughjson.dumps - If argument is
setortuplethen converts to list and passes throughjson.dumps - Otherwise, directly convert to
strand strip Otherwise, converts each argument directly tostrand then concatenates, encodes, and sends.
Parameters:
*args: Everything that is to be sent, each as a separate parameter. Must have at least one parameter.check_type(bool) : If types in *args should be checked. By default True.ending(str) : The ending of the bytes object to be sent through the serial port. By default a carraige return + newline ("\r\n")concatenate(str) : What the strings in args should be concatenated by. By default a space' '
Returns:
trueon success (everything has been sent through)falseon failure (not open, not waited long enough before sending, did not fully send through, etc.)
Connection.receive()
def receive(num_before=0)
Returns the most recent receive object
The IO thread will continuously detect receive data and put the bytes objects in the rcv_queue.
If there are no parameters, the method will return the most recent received data.
If num_before is greater than 0, then will return num_beforeth previous data.
- Note:
num_beforemust be less than the current size of the queue and greater or equal to 0- If not, returns None (no data)
- Example:
- 0 will return the most recent received data
- 1 will return the 2nd most recent received data
- ...
Parameters:
num_before(int) : Which receive object to return. Must be nonnegative. By default None.
Returns:
- A
tuplerepresenting the(timestamp received, data in bytes) Noneif no data was found or port not open
Connection.conv_bytes_to_str()
def conv_bytes_to_str(rcv, read_until=None, strip=True)
Convert bytes receive object to a string.
Parameters:
rcv(bytes): A bytes object. If None, then the method will return None.read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". If there are multiple occurrences ofread_until, then it will return the string that terminates with the first one. Ifread_untilis None or it doesn't exist, the it will return the entire string. By default None.strip(bool): If True, then strips spaces and newlines from either side of the processed string before returning. If False, returns the processed string in its entirety. By default True.
Returns:
- A
strrepresenting the data - None if
rcvis None
Connection.get()
def get(given_type, read_until=None, strip=True)
Gets first response after this method is called.
This method differs from receive() because receive() returns
the last element of the receive buffer, which could contain objects
that were received before this function was called. This function
waits for something to be received after it is called until it either
gets the object or until the timeout is reached.
Parameters:
given_type(type): eitherbytesorstr, indicating which one to return. Will raise exception if type is invalid, REGARDLESS ofself.exception. Example:get(str)orget(bytes).read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". If there are multiple occurrences ofread_until, then it will return the string that terminates with the first one. Ifread_untilis None or it doesn't exist, the it will return the entire string. By default None.strip(bool): If True, then strips spaces and newlines from either side of the processed string before returning. If False, returns the processed string in its entirety. By default True.
Returns:
- None if no data received (timeout reached)
- A
bytesobject indicating the data received iftypeisbytes - A
strobject indicating the data received, then passed throughconv_bytes_to_str(), iftypeisstr
Connection.get_all_rcv()
def get_all_rcv()
Returns the entire receive queue
The queue will be a queue_size-sized list that contains
tuples (timestamp received, received bytes).
Returns:
- A list of tuples indicating the timestamp received and the bytes object received
Connection.get_all_rcv_str()
def get_all_rcv_str(read_until=None, strip=True)
Returns entire receive queue as string.
Each bytes object will be passed into conv_bytes_to_str().
This means that read_until and strip will apply to
EVERY element in the receive queue before returning.
Parameters:
read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". If there are multiple occurrences ofread_until, then it will return the string that terminates with the first one. Ifread_untilis None or it doesn't exist, the it will return the entire string. By default None.strip(bool): If True, then strips spaces and newlines from either side of the processed string before returning. If False, returns the processed string in its entirety. By default True.
Returns:
- A list of tuples indicating the timestamp received and the converted string from bytes
Connection.get_first_response()
def get_first_response(*args, is_bytes=True, check_type=True, ending='\r\n', concatenate=' ', read_until=None, strip=True)
Gets the first response from the serial port after sending something.
This method works almost the same as send() (see self.send()).
It also returns a string representing the first response from the serial port after sending.
All *args and check_type, ending, and concatenate, will be sent to send().
If there is no response after reaching the timeout, then it breaks out of the method.
Parameters:
*args: Everything that is to be sent, each as a separate parameter. Must have at least one parameter.is_bytes: If False, then passes toconv_bytes_to_str()and returns a string with given optionsread_untilandstrip. Seeconv_bytes_to_str()for more details. If True, then returns rawbytesdata. By default True.check_type(bool): If types in *args should be checked. By default True.ending(str): The ending of the bytes object to be sent through the serial port. By default a carraige return ("\r\n")concatenate(str): What the strings in args should be concatenated by. By default a space' '.
These parameters only apply is is_bytes is False:
read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". Ifread_untilis None, the it will return the entire string. By default None.strip(bool): If True, then strips the received and processed string of whitespace and newlines, then returns the result. If False, then returns the raw result. By default True.
Returns:
- A string or bytes representing the first response from the serial port.
- None if there was no connection (if self.exception == False), no data, timeout reached, or send interval not reached.
Connection.wait_for_response()
def wait_for_response(response, after_timestamp=-1.0, read_until=None, strip=True)
Waits until the connection receives a given response.
This method will wait for a response that matches given response
whose time received is greater than given timestamp after_timestamp.
Parameters:
response(str, bytes): The receive data that the program is looking for. If given a string, then compares the string to the response after it is decoded inutf-8. If given a bytes, then directly compares the bytes object to the response. If given anything else, converts to string.after_timestamp(float): Look for responses that came after given time as the UNIX timestamp. If negative, the converts to time that the method was called, ortime.time(). By default -1.0
These parameters only apply if response is a string:
read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". Ifread_untilis None, the it will return the entire string. By default None.strip(bool): If True, then strips the received and processed string of whitespace and newlines, then returns the result. If False, then returns the raw result. By default True.
Returns:
- True on success
- False on failure: timeout reached because response has not been received.
Connection.send_for_response()
def send_for_response(response, *args, read_until=None, strip=True, check_type=True, ending='\r\n', concatenate=' ')
Continues sending something until the connection receives a given response.
This method will call send() and receive() repeatedly (calls again if does not match given response parameter).
See send() for more details on *args and check_type, ending, and concatenate, as these will be passed to the method.
Will return true on success and false on failure (reached timeout)
Parameters:
response(str, bytes): The receive data that the program looks for after sending. If given a string, then compares the string to the response after it is decoded inutf-8. If given a bytes, then directly compares the bytes object to the response.*args: Everything that is to be sent, each as a separate parameter. Must have at least one parameter.check_type(bool): If types in *args should be checked. By default True.ending(str): The ending of the bytes object to be sent through the serial port. By default a carraige return ("\r\n")concatenate(str): What the strings in args should be concatenated by. By default a space' '
These parameters only apply if response is a string:
read_until(str, None): Will return a string that terminates withread_until, excludingread_until. For example, if the string was"abcdefg123456\n", andread_untilwas\n, then it will return"abcdefg123456". Ifread_untilis None, the it will return the entire string. By default None.strip(bool): If True, then strips the received and processed string of whitespace and newlines, then returns the result. If False, then returns the raw result. By default True.
Returns:
trueon success: The incoming received data matchingresponse.falseon failure: Connection not established (if self.exception == False), incoming data did not matchresponse, ortimeoutwas reached, or send interval has not been reached.
Connection.reconnect()
def reconnect(timeout=None)
Attempts to reconnect the serial port.
This method will continuously try to connect to the ports provided in __init__()
until it reaches given timeout seconds. If timeout is None, then it will
continuously try to reconnect indefinitely.
Will raise ConnectException if already connected, regardless
of if exception is True or not.
Note that disconnecting the serial device will reset the receive and send queues.
Parameters:
timeout(float, None) : Will try to reconnect fortimeoutseconds before returning. If None, then will try to reconnect indefinitely. By default None.
Returns:
- True if able to reconnect
- False if not able to reconnect within given timeout
Connection.all_ports()
def all_ports(**kwargs)
Lists all available serial ports.
Calls tools.all_ports(), which itself calls serial.tools.list_ports.comports().
For more information, see here.
Parameters: See link above.
Returns: A generator-like object (see link above)
Connection.custom_io_thread()
def custom_io_thread(func)
A decorator custom IO thread rather than using the default one.
It is recommended to read pyserial's documentation before creating a custom IO thread.
What the IO thread executes every 0.01 seconds will be referred to as a "cycle".
Note that this method should be called before connect() is called, or
else the thread will use the default cycle.
To see the default cycle, see the documentation of BaseConnection.
What the IO thread will do now is:
- Check if anything is using (reading from/writing to) the variables
- If not, copy the variables into a
SendQueueandReceiveQueueobject. - Call the
custom_io_threadfunction (if none, calls the default cycle) - Copy the results from the function back into the send queue and receive queue.
- Rest for 0.01 seconds to rest the CPU
The cycle should be in a function that this decorator will be on top of. The function should accept three parameters:
conn(aserial.Serialobject)rcv_queue(aReceiveQueueobject; see more on how to use it in its documentation)send_queue(aSendQueueobject; see more on how to use it in its documentation)
To enable autocompletion on your text editor, you can add type hinting:
from com_server import Connection, SendQueue, ReceiveQueue
from serial import Serial
conn = Connection(...)
# some code
@conn.custom_io_thread
def custom_cycle(conn: Serial, rcv_queue: ReceiveQueue, send_queue: SendQueue):
# code here
conn.connect() # call this AFTER custom_io_thread()
# more code
The function below the decorator should not return anything.
Connection.connected
A property to determine if the connection object is currently connected to a serial port or not. This also can determine if the IO thread for this object is currently running or not.
Getter:
- A
boolindicating if the current connection is currently connected
Connection.timeout
A property to determine the timeout of this object.
Getter:
- Gets the timeout of this object.
Setter:
- Sets the timeout of this object after checking if convertible to nonnegative float.
Then, sets the timeout to the same value on the
pyserialobject of this class. If the value isfloat('inf'), then sets the value of thepyserialobject to None.
Connection.send_interval
A property to determine the send interval of this object.
Getter:
- Gets the send interval of this object.
Setter:
- Sets the send interval of this object after checking if convertible to nonnegative float.
Connection.conn_obj
A property to get the Serial object that handles sending and receiving.
Getter:
- Gets the Serial object.
Connection.available
A property indicating how much new data there is in the receive queue.
Getter:
- Gets the number of additional data received since the user last called the
receive()method.
Connection.port
Returns the current port of the connection
Getter:
- Gets the current port of the connection
com_server.RestApiHandler
A handler for creating endpoints with the Connection and Connection-based objects.
This class provides the framework for adding custom endpoints for doing
custom things with the serial connection and running the local server
that will host the API. It uses a flask_restful object as its back end.
Note that endpoints cannot have the names /register or /recall.
Additionally, resource classes have to extend the custom ConnectionResource class
from this library, not the Resource from flask_restful.
500 Internal Server Errors will occur with endpoints dealing with the connection
if the serial port is disconnected. The server will spawn another thread that will
immediately try to reconnect the serial port if it is disconnected. However, note
that the receive and send queues will reset when the serial port is disconnected.
If another process accesses an endpoint while another is
currently being used, then it will respond with
503 Service Unavailable.
More information on Flask and flask-restful
Register and recall endpoints:
/register(GET): An endpoint to register an IP; other endpoints will result in400status code if they are accessed without accessing this first (unlesshas_register_recallis False); if an IP is already registered then this will result in400; IPs must call this first before accessing serial port (unlesshas_register_recallis False)/recall(GET): After registered, can call/recallto "free" IP from server, allowing other IPs to call/registerto use the serial port
RestApiHandler.__init__()
def __init__(conn, has_register_recall=True, add_cors=False, catch_all_404s=True, **kwargs)
Constructor for class
Parameters:
conn(Connection): TheConnectionobject the API is going to be associated with.has_register_recall(bool): If False, removes the/registerand/recallendpoints so the user will not have to use them in order to access the other endpoints of the API. That is, visiting endpoints will not respond with a 400 status code even if/registerwas not accessed. By default True.add_cors(bool): If True, then the Flask app will have cross origin resource sharing enabled. By default False.catch_all_404s(bool): If True, then there will be JSON response for 404 errors. Otherwise, there will be a normal HTML response on 404. By default True.**kwargs, will be passed toflask_restful.Api(). See here for more info.
RestApiHandler.add_endpoint()
def add_endpoint(endpoint)
Decorator that adds an endpoint
This decorator should go above a class that
extends ConnectionResource. The class should
contain implementations of request methods such as
get(), post(), etc. similar to the Resource
class from flask_restful. To use the connection
object, use the self.conn attribute of the class
under the decorator.
For more information, see the flask_restful documentation.
Note that duplicate endpoints will result in an exception.
If there are two classes of the same name, even in different
endpoints, the program will append underscores to the name
until there are no more repeats. For example, if one class is
named "Hello" and another class is also named "Hello",
then the second class name will be changed to "Hello_".
This happens because flask_restful interprets duplicate class
names as duplicate endpoints.
If another process accesses an endpoint while another is
currently being used, then it will respond with
503 Service Unavailable.
Parameters:
endpoint: The endpoint to the resource. Cannot repeat./registerand/recallcannot be used, even ifhas_register_recallis False
RestApiHandler.add_resource()
def add_resource(*args, **kwargs)
Calls flask_restful.add_resource.
Allows adding endpoints that do not interact with the serial port.
See here
for more info on add_resource and here
for more info on flask_restful in general.
RestApiHandler.run_dev()
def run_dev(**kwargs)
Launches the Flask app as a development server.
All arguments in **kwargs will be passed to Flask.run().
For more information, see here.
For documentation on Flask in general, see here.
Automatically disconnects the Connection object after
the server is closed.
Some arguments include:
host: The host of the server. Ex:localhost,0.0.0.0,127.0.0.1, etc.port: The port to host it on. Ex:5000(default),8000,8080, etc.debug: If the app should be used in debug mode. Note that this may break some things because of reloads.
RestApiHandler.run_prod()
def run_prod(**kwargs)
Launches the Flask app as a Waitress production server.
All arguments in **kwargs will be passed to waitress.serve().
For more information, see here.
For Waitress documentation, see here.
If nothing is included, then runs on http://0.0.0.0:8080
Automatically disconnects the Connection object after
the server is closed.
RestApiHandler.flask_obj
Getter:
- Gets the
Flaskobject that is the backend of the endpoints and the server.
This can be used to modify and customize the Flask object in this class.
RestApiHandler.api_obj
Getter:
- Gets the
flask_restfulAPI object that handles parsing the classes.
This can be used to modify and customize the Api object in this class.
com_server.ConnectionResource
A custom resource object that is built to be used with RestApiHandler.
This class is to be extended and used like the Resource class.
Have get(), post(), and other methods for the types of responses you need.
com_server.api
This is the server API module for the built-in endpoints of COM-Server.
See the Server API.
com_server.SendQueue
The send queue object.
This object is like a queue but cannot be iterated through.
It contains methods such as front() and pop(), just like
the queue data structure in C++. However, objects cannot
be added to it because objects should only be added through
the send() method.
Makes sure the user only reads and pops from send queue and does not directly add or delete anything from the queue.
SendQueue.__init__()
def __init__(send_queue)
Constructor for the send queue object.
Parameters:
send_queue(list): The list that will act as the send queue
Returns:
- Nothing
SendQueue.front()
def front()
Returns the first element of the send queue.
Raises an IndexError if the length of the send queue is 0.
Parameters:
- None
Returns:
- The bytes object to send
SendQueue.pop()
def pop()
Removes the first index from the queue.
Raises an IndexError if the length of the send queue is 0.
Parameters:
- None
Returns:
- None
SendQueue.copy()
def copy()
Returns a shallow copy of the send queue list.
Using this to copy to a list may be dangerous, as
altering elements in the list may alter the elements
in the send queue itself. To prevent this, use the
deepcopy() method.
Parameters:
- None
Returns:
- A shallow copy of the send queue
SendQueue.deepcopy()
def deepcopy()
Returns a deepcopy of the send queue list.
By using this, you can modify the list without altering any elements of the actual send queue itself. However, it is a little more resource intensive.
Parameters:
- None
Returns:
- A deep copy of the send queue
com_server.ReceiveQueue
The ReceiveQueue object.
This object is a queue, but the user can only add bytes object(s) to it.
Makes sure the user does not directly add, delete, or modify the queue.
ReceiveQueue.__init__()
def __init__(rcv_queue, queue_size)
Constructor for the send queue object.
Parameters:
rcv_queue(list): The list that will act as the receive queue.queue_size(int): The maximum size of the receive queue
Returns:
- Nothing
ReceiveQueue.pushitems()
def pushitems(*args)
Adds a list of items to the receive queue.
All items in *args must be a bytes object. A
TypeError will be raised if not.
If the size exceeds queue_size when adding, then
it will pop the front of the queue.
A tuple (timestamp, bytes) will be added. The timestamp will be regenerated for each iteration of the for loop so they will be in order when binary searching.
Parameters:
*args: The bytes objects to add
Returns:
- Nothing
ReceiveQueue.copy()
def copy()
Returns a shallow copy of the receive queue list.
The receive queue list will be a list of tuples:
- (timestamp, bytes data)
Using this to copy to a list may be dangerous, as
altering elements in the list may alter the elements
in the receive queue itself. To prevent this, use the
deepcopy() method.
Parameters:
- None
Returns:
- A shallow copy of the receive queue
ReceiveQueue.deepcopy()
def deepcopy()
Returns a deepcopy of the receive queue.
The receive queue list will be a list of tuples:
- (timestamp, bytes data)
By using this, you can modify the list without altering any elements of the actual send queue itself. However, it is a little more resource intensive.
Parameters:
- None
Returns:
- A deep copy of the receive queue
Constants
NO_TIMEOUT = float("inf")
Use this if you do not want a timeout. Not recommended.
NO_SEND_INTERVAL = 0
Use this if you do not want a send interval. Not recommended.
NORMAL_BAUD_RATE = 9600
FAST_BAUD_RATE = 115200
Standard baud rates that are commonly used.
NO_RCV_QUEUE = 1
RCV_QUEUE_SIZE_XSMALL = 32
RCV_QUEUE_SIZE_SMALL = 128
RCV_QUEUE_SIZE_NORMAL = 256
RCV_QUEUE_SIZE_LARGE = 512
RCV_QUEUE_SIZE_XLARGE = 1024
Different receive queue sizes for queue_size. Default is RCV_QUEUE_SIZE_NORMAL.
DEFAULT_HOST="0.0.0.0"
DEFAULT_PORT=8080
Default host and port for the server.
Exceptions
com_server.ConnectException
This exception is raised whenever a user tries to do an operation with the Connection class while it is disconnected, but the operation requires it to be connected, or vice versa.
com_server.EndpointExistsException
This exception is raised if the user tries to add a route to the RestApiHandler that already exists.