Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greengrass Component IPC Subscription not receiving messages #407

Closed
cause-equals-time opened this issue Jul 20, 2023 · 12 comments
Closed
Labels
bug This issue is a bug.

Comments

@cause-equals-time
Copy link

cause-equals-time commented Jul 20, 2023

Describe the bug

Hi Team,

thank you in advance for your time, and please forgive me if I am doing something that is not according to the guidelines, it is the first time I am opening an issue.

I followed the documentation here and tried to implement a PubSub custom greengrass component in order to process messages at the edge. It successfully subscribes/publishes to IoT Core, and publishes locally on IPC, however, subscribing to a local IPC topic just doesn't work, there are no errors whatsoever but the subscription does not receive any messages, the message event is never triggered.

I publish messages via the same IPC Client and the mqtt bridge proxies them to IoT Core but I am not being able to have my component receive local messages via the stream operation.

Expected Behavior

I expected to receive the messages published to local IPC topics but I get nothing. No errors, no messages.

Current Behavior

Despite my IPC Client seemingly being correctly instantiated, since it is able to publish locally and publish and subscribe to IoT Core without any issues, when I try to subscribe (see code below) locally to '#', even though there are no errors, the message event never triggers.

See the logs below, the Client is instantiated without issues, the subscriptions both to IoT Core and IPC also do not throw any error, but then, the message I am publishing inside a setInterval() block does not trigger the message event:

2023-07-20T11:10:50.872Z [INFO] (Copier) genericPubSubComponent: stdout. Client ready. {scriptName=services.genericPubSubComponent.lifecycle.run.Script, serviceName=genericPubSubComponent, currentState=RUNNING}
2023-07-20T11:10:50.878Z [INFO] (Copier) genericPubSubComponent: stdout. [Local][IPC]_Subscribed to #. {scriptName=services.genericPubSubComponent.lifecycle.run.Script, serviceName=genericPubSubComponent, currentState=RUNNING}
2023-07-20T11:10:50.921Z [INFO] (Copier) genericPubSubComponent: stdout. [Iotcore]_Subscribed to iotcore2greengrass/#. {scriptName=services.genericPubSubComponent.lifecycle.run.Script, serviceName=genericPubSubComponent, currentState=RUNNING}
2023-07-20T11:11:20.932Z [INFO] (Copier) genericPubSubComponent: stdout. [Local][IPC]_Message published to local/self. {scriptName=services.genericPubSubComponent.lifecycle.run.Script, serviceName=genericPubSubComponent, currentState=RUNNING}
2023-07-20T11:11:50.959Z [INFO] (Copier) genericPubSubComponent: stdout. [Local][IPC]_Message published to local/self. {scriptName=services.genericPubSubComponent.lifecycle.run.Script, serviceName=genericPubSubComponent, currentState=RUNNING}

These messages published to local/self are picked up by the mqtt bridge and proxied to IoT Core, so publishing to IPC is working, it's just that the subscription doesn't.

Reproduction Steps

async subscribeToLocal(topic) {
        try {
            const subscribe_request ={
                topic: topic
            }
            await this.ipcClient.subscribeToTopic(subscribe_request).on('message', (message) => {
                console.log(JSON.stringify(message))
            }).on('streamError', (error) => {
                console.error('[Local][IPC]_Stream error:', error)
            }).on('ended', () => {
                console.log('[Local][IPC]_Streaming operation ended')
            }).activate()
            console.log(`[Local][IPC]_Subscribed to ${topic}`)
            } catch (error) {
                console.error(`[Local][IPC]_error subscribing: ${error}`)
            }
    }


Possible Solution

Is there something wrong with my subscribing code?

Additional Information/Context

No response

SDK version used

1.14.0

Environment details (OS name and version, etc.)

ubuntu-jammy-22.04-amd64-server-20230516
greengrass nucleus 2.11.0

@cause-equals-time cause-equals-time added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Jul 20, 2023
@yasminetalby yasminetalby self-assigned this Jul 20, 2023
@bretambrose
Copy link
Contributor

What does your component's accessControl block look like?

@cause-equals-time
Copy link
Author

Hi, @bretambrose
Thank you for your reply, here is my component's accessControl:

"accessControl": {
		"aws.greengrass.ipc.mqttproxy": {
			"genericPubSubComponent:mqttproxy:1": {
				"policyDescription": "Allows access to publish to all topics.",
				"operations": [
					"aws.greengrass#PublishToTopic",
					"aws.greengrass#SubscribeToTopic",
					"aws.greengrass#PublishToIoTCore",
					"aws.greengrass#SubscribeToIoTCore"
				],
				"resources": [
					"*"
				]
			}
		}
	}

I also tried with aws.greengrass.ipc.pubsub but the result was the same regarding the subscription to local topics, with the added problem of the component getting unauthorized when trying to publish to IoT Core. Is there something wrong with the block?

@bretambrose
Copy link
Contributor

bretambrose commented Jul 20, 2023

Still confirming with Greengrass, but I believe your policy is incorrect. You can only whitelist operations under blocks ("aws.greengrass.ipc.mqttproxy") that correspond to the associated set of operations.

For iot core mqtt operations, "aws.greengrass.ipc.mqttproxy" is correct, but for local mqtt data flows, you'll want to use "aws.greengrass.ipc.pubsub" See the accessControl block in the authorization policy example for https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html

@yasminetalby yasminetalby removed their assignment Jul 20, 2023
@yasminetalby yasminetalby removed the needs-triage This issue or PR still needs to be triaged. label Jul 20, 2023
@bretambrose
Copy link
Contributor

Thinking further though, you should have seen an exception when you tried to subscribe without permission. That is a bit puzzling and warrants further investigation (I constantly mess up permissions and get exceptions)

@MikeDombo
Copy link
Contributor

Can you please provide the full code? You've shown the subscription, but not the publish.

@cause-equals-time
Copy link
Author

@bretambrose

At first i was using ""aws.greengrass.ipc.pubsub" and was only white listing "aws.greengrass#PublishToTopic" and "aws.greengrass#SubscribeToTopic" and I was still not getting any messages. When I ran into this issue, out of curiosity, I tried to publish and subscribe to IoT Core from my component and that is when I changed the accessControl block to its current form.

Also, if when you use "aws.greengrass.ipc.mqttproxy" you are only allowed to use IoT Core mqtt operations, then what exactly can you proxy? And why is my component able to publish locally?

I have been searching everywhere for an answer, I didn't fancy disturbing you guys but I am at a loss right now quite honestly, so before trying another language, I decided to ask for your input.

@MikeDombo
Copy link
Contributor

MikeDombo commented Jul 20, 2023

Greengrass local publish/subscribe and MQTT are different protocols. The MQTT will go to IoT Core, you do not use the MQTT Bridge component to do this. It is a "proxy" because your component does not connect to IoT Core directly, it goes through Greengrass to send the MQTT message to IoT Core.

What are you actually trying to do? Are you trying to publish and subscribe to messages within the local device, or to IoT Core?

From the documentation:

Publish/subscribe (pubsub) messaging enables you to send and receive messages to topics. Components can publish messages to topics to send messages to other components. Then, components that are subscribed to that topic can act on the messages that they receive.

The AWS IoT Core MQTT messaging IPC service lets you send and receive MQTT messages to and from AWS IoT Core. Components can publish messages to AWS IoT Core and subscribe to topics to act on MQTT messages from other sources. For more information about the AWS IoT Core implementation of MQTT, see MQTT in the AWS IoT Core Developer Guide.

@MikeDombo
Copy link
Contributor

MikeDombo commented Jul 20, 2023

Because you are publishing from the same component that you're subscribing from, Greengrass is not delivering the message to you by default. This behavior only applies to local publish/subscribe, not MQTT.

You can change this behavior in the subscribe request by setting the receiveMode to RECEIVE_ALL_MESSAGES instead of RECEIVE_MESSAGES_FROM_OTHERS

https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-subscribetotopic:~:text=(Optional)%20The%20behavior%20that%20specifies%20whether%20the%20component%20receives%20messages%20from%20itself.%20You%20can%20change%20this%20behavior%20to%20allow%20a%20component%20to%20act%20on%20its%20own%20messages.%20The%20default%20behavior%20depends%20on%20whether%20the%20topic%20contains%20an%20MQTT%20wildcard.%20Choose%20from%20the%20following%20options%3A

@cause-equals-time
Copy link
Author

Dear @MikeDombo here is the full code:

const greengrasscoreipc = require('aws-iot-device-sdk-v2/dist/greengrasscoreipc')

class IpcClient {
    ipcClient
    constructor() {
        this.ipcClient = null
    }
    async initialize() {
        try {
            this.ipcClient = greengrasscoreipc.createClient()
            await this.ipcClient.connect()
            console.log('Client ready')
        } catch (error) {
            console.error(`Error initializing client: ${error}`)
        }
    }
    async publishToLocal(topic, message) {
        try {
            const publish_request = {
                topic: topic,
                publishMessage: {
                    binaryMessage: {
                        message: JSON.stringify(message)
                    }
                }
            }
            await this.ipcClient.publishToTopic(publish_request)
            console.log(`[Local][IPC]_Message published to ${topic}`)
        } catch (error) {
            console.error(`[Local][IPC]_Error publishing: ${error}`)
        }
    }
    async subscribeToLocal(topic) {
        try {
            const subscribe_request ={
                topic: topic
            }
            await this.ipcClient.subscribeToTopic(subscribe_request, undefined).on('message', (message) => {
                console.log(JSON.stringify(message))
            }).on('streamError', (error) => {
                console.error('[Local][IPC]_Stream error:', error)
            }).on('ended', () => {
                console.log('[Local][IPC]_Streaming operation ended')
            }).activate()
            console.error(`[Local][IPC]_Subscribed to ${topic}`)
            } catch (error) {
                console.error(`[Local][IPC]_error subscribing: ${error}`)
            }
    }
    async publishToIoTCore(topic, message){
        try {
            await this.ipcClient.publishToIoTCore({
                topicName: topic,
                payload: message,
                qos : greengrasscoreipc.model.QOS.AT_LEAST_ONCE
            })
        } catch (error) {
            console.error(error)
        }
    }
    async subscribeToIoTCore(topic){

        try {
            await this.ipcClient.subscribeToIoTCore({
                topicName: topic,
                qos: greengrasscoreipc.model.QOS.AT_LEAST_ONCE
            }).on("message", (message) => {
                console.log(JSON.stringify(message))
            }).activate()
            console.log(`[Iotcore]_Subscribed to ${topic}`)
        } catch (error) {
            console.error(error)
        }
    }
}

async function main() {
    const ipcClient = new IpcClient()
    await ipcClient.initialize()
    await ipcClient.subscribeToLocal('#')
    await ipcClient.subscribeToIoTCore('iotcore2greengrass/#')
    setInterval(async () => {
            await ipcClient.publishToLocal('local/self', { origin: 'greengrass', time: `Test at ${Date.now()}` })
            //await ipcClient.publishToIoTCore('greengrass2iotcore/self', JSON.stringify({ origin: 'greengrass', time: `Test at ${Date.now()}` }))
    }, 30000)
    
}
main()

@cause-equals-time
Copy link
Author

Dear @MikeDombo

What I am trying to do is the following:

I have a greengrass client device that publishes to moquette, from moquette I proxy the messages to IPC via mqttbridge component, and I want my custom component to subscribe to the relevant topic, receive these messages, process them, and then send them on to IoT core deppending on the implemented logic (either by publishing to IoT Core via the ipcClient, or alternatively by publishing locally and having the mqttbrige component proxying the messages to IoT Core)

It does look like you already pointed out what is the issue :)

@cause-equals-time
Copy link
Author

Dear @MikeDombo and @bretambrose thank you very much for the help. It was working all along, it was just that by default, as Mike promptly pointed out, the local subscription will ignore messages published by the same component, which erroneously lead me to believe there was something wrong.

Thank you again, and forgive me for taking your time.

Have a great day.

@github-actions
Copy link

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug.
Projects
None yet
Development

No branches or pull requests

4 participants